blob: 79e1872f8eeb5dee822db3837dfaefcf813aa0ca [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Protocol Buffers describing the Fn API and boostrapping.
*
* TODO: Usage of plural names in lists looks awkward in Java
* e.g. getOutputsMap, addCodersBuilder
*
* TODO: gRPC / proto field names conflict with generated code
* e.g. "class" in java, "output" in python
*/
syntax = "proto3";
/* TODO: Consider consolidating common components in another package
* and lanaguage namespaces for re-use with Runner Api.
*/
package org.apache.beam.fn.v1;
option java_package = "org.apache.beam.fn.v1";
option java_outer_classname = "BeamFnApi";
import "google/protobuf/any.proto";
import "google/protobuf/timestamp.proto";
/*
* Constructs that define the pipeline shape.
*
* These are mostly unstable due to the missing pieces to be shared with
* the Runner Api like windowing strategy, display data, .... There are still
* some modelling questions related to whether a side input is modelled
* as another field on a PrimitiveTransform or as part of inputs and we
* still are missing things like the CompositeTransform.
*/
// A representation of an input or output definition on a primitive transform.
// Stable
message Target {
// A repeated list of target definitions.
message List {
repeated Target target = 1;
}
// (Required) The id of the PrimitiveTransform which is the target.
string primitive_transform_reference = 1;
// (Required) The local name of an input or output defined on the primitive
// transform.
string name = 2;
}
// Information defining a PCollection
message PCollection {
// (Required) A reference to a coder.
string coder_reference = 1;
// TODO: Windowing strategy, ...
}
// A primitive transform within Apache Beam.
message PrimitiveTransform {
// (Required) A pipeline level unique id which can be used as a reference to
// refer to this.
string id = 1;
// (Required) A function spec that is used by this primitive
// transform to process data.
FunctionSpec function_spec = 2;
// A map of distinct input names to target definitions.
// For example, in CoGbk this represents the tag name associated with each
// distinct input name and a list of primitive transforms that are associated
// with the specified input.
map<string, Target.List> inputs = 3;
// A map from local output name to PCollection definitions. For example, in
// DoFn this represents the tag name associated with each distinct output.
map<string, PCollection> outputs = 4;
// TODO: Should we model side inputs as a special type of input for a
// primitive transform or should it be modeled as the relationship that
// the predecessor input will be a view primitive transform.
// A map of from side input names to side inputs.
map<string, SideInput> side_inputs = 5;
// The user name of this step.
// TODO: This should really be in display data and not at this level
string step_name = 6;
}
/*
* User Definable Functions
*
* This is still unstable mainly due to how we model the side input.
*/
// Defines the common elements of user-definable functions, to allow the SDK to
// express the information the runner needs to execute work.
// Stable
message FunctionSpec {
// (Required) A pipeline level unique id which can be used as a reference to
// refer to this.
string id = 1;
// (Required) A globally unique name representing this user definable
// function.
//
// User definable functions use the urn encodings registered such that another
// may implement the user definable function within another language.
//
// For example:
// urn:org.apache.beam:coder:kv:1.0
string urn = 2;
// (Required) Reference to specification of execution environment required to
// invoke this function.
string environment_reference = 3;
// Data used to parameterize this function. Depending on the urn, this may be
// optional or required.
google.protobuf.Any data = 4;
}
message SideInput {
// TODO: Coder?
// For RunnerAPI.
Target input = 1;
// For FnAPI.
FunctionSpec view_fn = 2;
}
// Defines how to encode values into byte streams and decode values from byte
// streams. A coder can be parameterized by additional properties which may or
// may not be language agnostic.
//
// Coders using the urn:org.apache.beam:coder namespace must have their
// encodings registered such that another may implement the encoding within
// another language.
//
// For example:
// urn:org.apache.beam:coder:kv:1.0
// urn:org.apache.beam:coder:iterable:1.0
// Stable
message Coder {
// TODO: This looks weird when compared to the other function specs
// which use URN to differentiate themselves. Should "Coder" be embedded
// inside the FunctionSpec data block.
// The data associated with this coder used to reconstruct it.
FunctionSpec function_spec = 1;
// A list of component coder references.
//
// For a key-value coder, there must be exactly two component coder references
// where the first reference represents the key coder and the second reference
// is the value coder.
//
// For an iterable coder, there must be exactly one component coder reference
// representing the value coder.
//
// TODO: Perhaps this is redundant with the data of the FunctionSpec
// for known coders?
repeated string component_coder_reference = 2;
}
// A descriptor for connecting to a remote port using the Beam Fn Data API.
// Allows for communication between two environments (for example between the
// runner and the SDK).
// Stable
message RemoteGrpcPort {
// (Required) An API descriptor which describes where to
// connect to including any authentication that is required.
ApiServiceDescriptor api_service_descriptor = 1;
}
/*
* Control Plane API
*
* Progress reporting and splitting still need further vetting. Also, this may change
* with the addition of new types of instructions/responses related to metrics.
*/
// An API that describes the work that a SDK harness is meant to do.
// Stable
service BeamFnControl {
// Instructions sent by the runner to the SDK requesting different types
// of work.
rpc Control(
// A stream of responses to instructions the SDK was asked to be performed.
stream InstructionResponse
) returns (
// A stream of instructions requested of the SDK to be performed.
stream InstructionRequest
) {}
}
// A request sent by a runner which it the SDK is asked to fulfill.
// Stable
message InstructionRequest {
// (Required) An unique identifier provided by the runner which represents
// this requests execution. The InstructionResponse MUST have the matching id.
string instruction_id = 1;
// (Required) A request that the SDK Harness needs to interpret.
oneof request {
RegisterRequest register = 1000;
ProcessBundleRequest process_bundle = 1001;
ProcessBundleProgressRequest process_bundle_progress = 1002;
ProcessBundleSplitRequest process_bundle_split = 1003;
}
}
// The response for an associated request the SDK had been asked to fulfill.
// Stable
message InstructionResponse {
// (Required) A reference provided by the runner which represents a requests
// execution. The InstructionResponse MUST have the matching id when
// responding to the runner.
string instruction_id = 1;
// If this is specified, then this instruction has failed.
// A human readable string representing the reason as to why processing has
// failed.
string error = 2;
// If the instruction did not fail, it is required to return an equivalent
// response type depending on the request this matches.
oneof response {
RegisterResponse register = 1000;
ProcessBundleResponse process_bundle = 1001;
ProcessBundleProgressResponse process_bundle_progress = 1002;
ProcessBundleSplitResponse process_bundle_split = 1003;
}
}
// A list of objects which can be referred to by the runner in
// future requests.
// Stable
message RegisterRequest {
// (Optional) The set of descriptors used to process bundles.
repeated ProcessBundleDescriptor process_bundle_descriptor = 1;
}
// Stable
message RegisterResponse {
}
// A descriptor of references used when processing a bundle.
// Stable
message ProcessBundleDescriptor {
// (Required) A pipeline level unique id which can be used as a reference to
// refer to this.
string id = 1;
// (Required) A list of primitive transforms that should
// be used to construct the bundle processing graph.
repeated PrimitiveTransform primitive_transform = 2;
// (Required) The set of all coders referenced in this bundle.
repeated Coder coders = 4;
}
// A request to process a given bundle.
// Stable
message ProcessBundleRequest {
// (Required) A reference to the process bundle descriptor that must be
// instantiated and executed by the SDK harness.
string process_bundle_descriptor_reference = 1;
// (Optional) A list of cache tokens that can be used by an SDK to cache
// data looked up using the State API across multiple bundles.
repeated CacheToken cache_tokens = 2;
}
// Stable
message ProcessBundleResponse {
}
message ProcessBundleProgressRequest {
// (Required) A reference to an active process bundle request with the given
// instruction id.
string instruction_reference = 1;
}
message ProcessBundleProgressResponse {
// (Required) The finished amount of work. A monotonically increasing
// unitless measure of work finished.
double finished_work = 1;
// (Required) The known amount of backlog for the process bundle request.
// Computed as:
// (estimated known work - finish work) / finished work
double backlog = 2;
}
message ProcessBundleSplitRequest {
// (Required) A reference to an active process bundle request with the given
// instruction id.
string instruction_reference = 1;
// (Required) The fraction of work (when compared to the known amount of work)
// the process bundle request should try to split at.
double fraction = 2;
}
// urn:org.apache.beam:restriction:element-count:1.0
message ElementCountRestriction {
// A restriction representing the number of elements that should be processed.
// Effectively the range [0, count]
int64 count = 1;
}
// urn:org.apache.beam:restriction:element-count-skip:1.0
message ElementCountSkipRestriction {
// A restriction representing the number of elements that should be skipped.
// Effectively the range (count, infinity]
int64 count = 1;
}
// Each primitive transform that is splittable is defined by a restriction
// it is currently processing. During splitting, that currently active
// restriction (R_initial) is split into 2 components:
// * a restriction (R_done) representing all elements that will be fully
// processed
// * a restriction (R_todo) representing all elements that will not be fully
// processed
//
// where:
// R_initial = R_done ⋃ R_todo
message PrimitiveTransformSplit {
// (Required) A reference to a primitive transform with the given id that
// is part of the active process bundle request with the given instruction
// id.
string primitive_transform_reference = 1;
// (Required) A function specification describing the restriction
// that has been completed by the primitive transform.
//
// For example, a remote GRPC source will have a specific urn and data
// block containing an ElementCountRestriction.
FunctionSpec completed_restriction = 2;
// (Required) A function specification describing the restriction
// representing the remainder of work for the primitive transform.
//
// FOr example, a remote GRPC source will have a specific urn and data
// block contain an ElemntCountSkipRestriction.
FunctionSpec remaining_restriction = 3;
}
message ProcessBundleSplitResponse {
// (Optional) A set of split responses for a currently active work item.
//
// If primitive transform B is a descendant of primitive transform A and both
// A and B report a split. Then B's restriction is reported as an element
// restriction pair and thus the fully reported restriction is:
// R = A_done
// ⋃ (A_boundary ⋂ B_done)
// ⋃ (A_boundary ⋂ B_todo)
// ⋃ A_todo
// If there is a decendant of B named C, then C would similarly report a
// set of element pair restrictions.
//
// This restriction is processed and completed by the currently active process
// bundle request:
// A_done ⋃ (A_boundary ⋂ B_done)
// and these restrictions will be processed by future process bundle requests:
// A_boundary â‹‚ B_todo (passed to SDF B directly)
// A_todo (passed to SDF A directly)
// If primitive transform B and C are siblings and descendants of A and A, B,
// and C report a split. Then B and C's restrictions are relative to A's.
// R = A_done
// ⋃ (A_boundary ⋂ B_done)
// ⋃ (A_boundary ⋂ B_todo)
// ⋃ (A_boundary ⋂ B_todo)
// ⋃ (A_boundary ⋂ C_todo)
// ⋃ A_todo
// If there is no descendant of B or C also reporting a split, than
// B_boundary = ∅ and C_boundary = ∅
//
// This restriction is processed and completed by the currently active process
// bundle request:
// A_done ⋃ (A_boundary ⋂ B_done)
// ⋃ (A_boundary ⋂ C_done)
// and these restrictions will be processed by future process bundle requests:
// A_boundary â‹‚ B_todo (passed to SDF B directly)
// A_boundary â‹‚ C_todo (passed to SDF C directly)
// A_todo (passed to SDF A directly)
//
// Note that descendants splits should only be reported if it is inexpensive
// to compute the boundary restriction intersected with descendants splits.
// Also note, that the boundary restriction may represent a set of elements
// produced by a parent primitive transform which can not be split at each
// element or that there are intermediate unsplittable primitive transforms
// between an ancestor splittable function and a descendant splittable
// function which may have more than one output per element. Finally note
// that the descendant splits should only be reported if the split
// information is relatively compact.
repeated PrimitiveTransformSplit splits = 1;
}
/*
* Data Plane API
*/
// Messages used to represent logical byte streams.
// Stable
message Elements {
// Represents multiple encoded elements in nested context for a given named
// instruction and target.
message Data {
// (Required) A reference to an active instruction request with the given
// instruction id.
string instruction_reference = 1;
// (Required) A definition representing a consumer or producer of this data.
// If received by a harness, this represents the consumer within that
// harness that should consume these bytes. If sent by a harness, this
// represents the producer of these bytes.
//
// Note that a single element may span multiple Data messages.
//
// Note that a sending/receiving pair should share the same target
// identifier.
Target target = 2;
// (Optional) Represents a part of a logical byte stream. Elements within
// the logical byte stream are encoded in the nested context and
// concatenated together.
//
// An empty data block represents the end of stream for the given
// instruction and target.
bytes data = 3;
}
// (Required) A list containing parts of logical byte streams.
repeated Data data = 1;
}
// Stable
service BeamFnData {
// Used to send data between harnesses.
rpc Data(
// A stream of data representing input.
stream Elements
) returns (
// A stream of data representing output.
stream Elements
) {}
}
/*
* State API
*/
message StateRequest {
// (Required) An unique identifier provided by the SDK which represents this
// requests execution. The StateResponse corresponding with this request
// will have the matching id.
string id = 1;
// (Required) The associated instruction id of the work that is currently
// being processed. This allows for the runner to associate any modifications
// to state to be committed with the appropriate work execution.
string instruction_reference = 2;
// (Required) The state key this request is for.
StateKey state_key = 3;
// (Required) The action to take on this request.
oneof request {
// A request to get state.
StateGetRequest get = 1000;
// A request to append to state.
StateAppendRequest append = 1001;
// A request to clear state.
StateClearRequest clear = 1002;
}
}
message StateResponse {
// (Required) A reference provided by the SDK which represents a requests
// execution. The StateResponse must have the matching id when responding
// to the SDK.
string id = 1;
// (Optional) If this is specified, then the state request has failed.
// A human readable string representing the reason as to why the request
// failed.
string error = 2;
// A corresponding response matching the request will be populated.
oneof response {
// A response to getting state.
StateGetResponse get = 1000;
// A response to appending to state.
StateAppendResponse append = 1001;
// A response to clearing state.
StateClearResponse clear = 1002;
}
}
service BeamFnState {
// Used to get/append/clear state stored by the runner on behalf of the SDK.
rpc State(
// A stream of state instructions requested of the runner.
stream StateRequest
) returns (
// A stream of responses to state instructions the runner was asked to be
// performed.
stream StateResponse
) {}
}
message CacheToken {
// (Required) Represents the function spec and tag associated with this state
// key.
//
// By combining the function_spec_reference with the tag representing:
// * the input, we refer to the iterable portion of a large GBK
// * the side input, we refer to the side input
// * the user state, we refer to user state
Target target = 1;
// (Required) An opaque identifier.
bytes token = 2;
}
message StateKey {
// (Required) Represents the function spec and tag associated with this state
// key.
//
// By combining the function_spec_reference with the tag representing:
// * the input, we refer to fetching the iterable portion of a large GBK
// * the side input, we refer to fetching the side input
// * the user state, we refer to fetching user state
Target target = 1;
// (Required) The bytes of the window which this state request is for encoded
// in the nested context.
bytes window = 2;
// (Required) The user key encoded in the nested context.
bytes key = 3;
}
// A logical byte stream which can be continued using the state API.
message ContinuableStream {
// (Optional) If specified, represents a token which can be used with the
// state API to get the next chunk of this logical byte stream. The end of
// the logical byte stream is signalled by this field being unset.
bytes continuation_token = 1;
// Represents a part of a logical byte stream. Elements within
// the logical byte stream are encoded in the nested context and
// concatenated together.
bytes data = 2;
}
// A request to get state.
message StateGetRequest {
// (Optional) If specified, signals to the runner that the response
// should resume from the following continuation token.
//
// If unspecified, signals to the runner that the response should start
// from the beginning of the logical continuable stream.
bytes continuation_token = 1;
}
// A response to get state.
message StateGetResponse {
// (Required) The response containing a continuable logical byte stream.
ContinuableStream stream = 1;
}
// A request to append state.
message StateAppendRequest {
// Represents a part of a logical byte stream. Elements within
// the logical byte stream are encoded in the nested context and
// multiple append requests are concatenated together.
bytes data = 1;
}
// A response to append state.
message StateAppendResponse {
}
// A request to clear state.
message StateClearRequest {
}
// A response to clear state.
message StateClearResponse {
}
/*
* Logging API
*
* This is very stable. There can be some changes to how we define a LogEntry,
* to increase/decrease the severity types, the way we format an exception/stack
* trace, or the log site.
*/
// A log entry
message LogEntry {
// A list of log entries, enables buffering and batching of multiple
// log messages using the logging API.
message List {
// (Required) One or or more log messages.
repeated LogEntry log_entries = 1;
}
// The severity of the event described in a log entry, expressed as one of the
// severity levels listed below. For your reference, the levels are
// assigned the listed numeric values. The effect of using numeric values
// other than those listed is undefined.
//
// If you are writing log entries, you should map other severity encodings to
// one of these standard levels. For example, you might map all of
// Java's FINE, FINER, and FINEST levels to `Severity.DEBUG`.
//
// This list is intentionally not comprehensive; the intent is to provide a
// common set of "good enough" severity levels so that logging front ends
// can provide filtering and searching across log types. Users of the API are
// free not to use all severity levels in their log messages.
enum Severity {
// Trace level information, also the default log level unless
// another severity is specified.
TRACE = 0;
// Debugging information.
DEBUG = 10;
// Normal events.
INFO = 20;
// Normal but significant events, such as start up, shut down, or
// configuration.
NOTICE = 30;
// Warning events might cause problems.
WARN = 40;
// Error events are likely to cause problems.
ERROR = 50;
// Critical events cause severe problems or brief outages and may
// indicate that a person must take action.
CRITICAL = 60;
}
// (Required) The severity of the log statement.
Severity severity = 1;
// (Required) The time at which this log statement occurred.
google.protobuf.Timestamp timestamp = 2;
// (Required) A human readable message.
string message = 3;
// (Optional) An optional trace of the functions involved. For example, in
// Java this can include multiple causes and multiple suppressed exceptions.
string trace = 4;
// (Optional) A reference to the instruction this log statement is associated
// with.
string instruction_reference = 5;
// (Optional) A reference to the primitive transform this log statement is
// associated with.
string primitive_transform_reference = 6;
// (Optional) Human-readable name of the function or method being invoked,
// with optional context such as the class or package name. The format can
// vary by language. For example:
// qual.if.ied.Class.method (Java)
// dir/package.func (Go)
// module.function (Python)
// file.cc:382 (C++)
string log_location = 7;
// (Optional) The name of the thread this log statement is associated with.
string thread = 8;
}
message LogControl {
}
// Stable
service BeamFnLogging {
// Allows for the SDK to emit log entries which the runner can
// associate with the active job.
rpc Logging(
// A stream of log entries batched into lists emitted by the SDK harness.
stream LogEntry.List
) returns (
// A stream of log control messages used to configure the SDK.
stream LogControl
) {}
}
/*
* Environment types
*/
message ApiServiceDescriptor {
// (Required) A pipeline level unique id which can be used as a reference to
// refer to this.
string id = 1;
// (Required) The URL to connect to.
string url = 2;
// (Optional) The method for authentication. If unspecified, access to the
// url is already being performed in a trusted context (e.g. localhost,
// private network).
oneof authentication {
OAuth2ClientCredentialsGrant oauth2_client_credentials_grant = 3;
}
}
message OAuth2ClientCredentialsGrant {
// (Required) The URL to submit a "client_credentials" grant type request for
// an OAuth access token which will be used as a bearer token for requests.
string url = 1;
}
// A Docker container configuration for launching the SDK harness to execute
// user specified functions.
message DockerContainer {
// (Required) A pipeline level unique id which can be used as a reference to
// refer to this.
string id = 1;
// (Required) The Docker container URI
// For example "dataflow.gcr.io/v1beta3/java-batch:1.5.1"
string uri = 2;
// (Optional) Docker registry specification.
// If unspecified, the uri is expected to be able to be fetched without
// requiring additional configuration by a runner.
string registry_reference = 3;
}