blob: b45be09efb6493dc93f7d37719a2ddad00eeaafe [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Protocol Buffers describing the Runner API, which is the runner-independent,
* SDK-independent definition of the Beam model.
*/
syntax = "proto3";
package org.apache.beam.model.pipeline.v1;
option go_package = "pipeline_v1";
option java_package = "org.apache.beam.model.pipeline.v1";
option java_outer_classname = "RunnerApi";
import "google/protobuf/any.proto";
// A set of mappings from id to message. This is included as an optional field
// on any proto message that may contain references needing resolution.
message Components {
// (Required) A map from pipeline-scoped id to PTransform.
map<string, PTransform> transforms = 1;
// (Required) A map from pipeline-scoped id to PCollection.
map<string, PCollection> pcollections = 2;
// (Required) A map from pipeline-scoped id to WindowingStrategy.
map<string, WindowingStrategy> windowing_strategies = 3;
// (Required) A map from pipeline-scoped id to Coder.
map<string, Coder> coders = 4;
// (Required) A map from pipeline-scoped id to Environment.
map<string, Environment> environments = 5;
}
// A disjoint union of all the things that may contain references
// that require Components to resolve.
message MessageWithComponents {
// (Optional) The by-reference components of the root message,
// enabling a standalone message.
//
// If this is absent, it is expected that there are no
// references.
Components components = 1;
// (Required) The root message that may contain pointers
// that should be resolved by looking inside components.
oneof root {
Coder coder = 2;
CombinePayload combine_payload = 3;
SdkFunctionSpec sdk_function_spec = 4;
ParDoPayload par_do_payload = 6;
PTransform ptransform = 7;
PCollection pcollection = 8;
ReadPayload read_payload = 9;
SideInput side_input = 11;
WindowIntoPayload window_into_payload = 12;
WindowingStrategy windowing_strategy = 13;
FunctionSpec function_spec = 14;
}
}
// A Pipeline is a hierarchical graph of PTransforms, linked
// by PCollections.
//
// This is represented by a number of by-reference maps to nodes,
// PCollections, SDK environments, UDF, etc., for
// supporting compact reuse and arbitrary graph structure.
//
// All of the keys in the maps here are arbitrary strings that are only
// required to be internally consistent within this proto message.
message Pipeline {
// (Required) The coders, UDFs, graph nodes, etc, that make up
// this pipeline.
Components components = 1;
// (Required) The ids of all PTransforms that are not contained within another PTransform.
// These must be in shallow topological order, so that traversing them recursively
// in this order yields a recursively topological traversal.
repeated string root_transform_ids = 2;
// (Optional) Static display data for the pipeline. If there is none,
// it may be omitted.
DisplayData display_data = 3;
}
// An applied PTransform! This does not contain the graph data, but only the
// fields specific to a graph node that is a Runner API transform
// between PCollections.
message PTransform {
// (Required) A unique name for the application node.
//
// Ideally, this should be stable over multiple evolutions of a pipeline
// for the purposes of logging and associating pipeline state with a node,
// etc.
//
// If it is not stable, then the runner decides what will happen. But, most
// importantly, it must always be here and be unique, even if it is
// autogenerated.
string unique_name = 5;
// (Optional) A URN and payload that, together, fully defined the semantics
// of this transform.
//
// If absent, this must be an "anonymous" composite transform.
//
// For primitive transform in the Runner API, this is required, and the
// payloads are well-defined messages. When the URN indicates ParDo it
// is a ParDoPayload, and so on.
//
// TODO: document the standardized URNs and payloads
// TODO: separate standardized payloads into a separate proto file
//
// For some special composite transforms, the payload is also officially
// defined:
//
// - when the URN is "urn:beam:transforms:combine" it is a CombinePayload
//
FunctionSpec spec = 1;
// (Optional) if this node is a composite, a list of the ids of
// transforms that it contains.
repeated string subtransforms = 2;
// (Required) A map from local names of inputs (unique only with this map, and
// likely embedded in the transform payload and serialized user code) to
// PCollection ids.
//
// The payload for this transform may clarify the relationship of these
// inputs. For example:
//
// - for a Flatten transform they are merged
// - for a ParDo transform, some may be side inputs
//
// All inputs are recorded here so that the topological ordering of
// the graph is consistent whether or not the payload is understood.
//
map<string, string> inputs = 3;
// (Required) A map from local names of outputs (unique only within this map,
// and likely embedded in the transform payload and serialized user code)
// to PCollection ids.
//
// The URN or payload for this transform node may clarify the type and
// relationship of these outputs. For example:
//
// - for a ParDo transform, these are tags on PCollections, which will be
// embedded in the DoFn.
//
map<string, string> outputs = 4;
// (Optional) Static display data for this PTransform application. If
// there is none, or it is not relevant (such as use by the Fn API)
// then it may be omitted.
DisplayData display_data = 6;
}
// A PCollection!
message PCollection {
// (Required) A unique name for the PCollection.
//
// Ideally, this should be stable over multiple evolutions of a pipeline
// for the purposes of logging and associating pipeline state with a node,
// etc.
//
// If it is not stable, then the runner decides what will happen. But, most
// importantly, it must always be here, even if it is autogenerated.
string unique_name = 1;
// (Required) The id of the Coder for this PCollection.
string coder_id = 2;
// (Required) Whether this PCollection is bounded or unbounded
IsBounded.Enum is_bounded = 3;
// (Required) The id of the windowing strategy for this PCollection.
string windowing_strategy_id = 4;
// (Optional) Static display data for this PTransform application. If
// there is none, or it is not relevant (such as use by the Fn API)
// then it may be omitted.
DisplayData display_data = 5;
}
// The payload for the primitive ParDo transform.
message ParDoPayload {
// (Required) The SdkFunctionSpec of the DoFn.
SdkFunctionSpec do_fn = 1;
// (Required) Additional pieces of context the DoFn may require that
// are not otherwise represented in the payload.
// (may force runners to execute the ParDo differently)
repeated Parameter parameters = 2;
// (Optional) A mapping of local input names to side inputs, describing
// the expected access pattern.
map<string, SideInput> side_inputs = 3;
// (Optional) A mapping of local state names to state specifications.
map<string, StateSpec> state_specs = 4;
// (Optional) A mapping of local timer names to timer specifications.
map<string, TimerSpec> timer_specs = 5;
// Whether the DoFn is splittable
bool splittable = 6;
}
// Parameters that a UDF might require.
//
// The details of how a runner sends these parameters to the SDK harness
// are the subject of the Fn API.
//
// The details of how an SDK harness delivers them to the UDF is entirely
// up to the SDK. (for some SDKs there may be parameters that are not
// represented here if the runner doesn't need to do anything)
//
// Here, the parameters are simply indicators to the runner that they
// need to run the function a particular way.
//
// TODO: the evolution of the Fn API will influence what needs explicit
// representation here
message Parameter {
Type.Enum type = 1;
message Type {
enum Enum {
UNSPECIFIED = 0;
WINDOW = 1;
PIPELINE_OPTIONS = 2;
RESTRICTION_TRACKER = 3;
}
}
}
message StateSpec {
oneof spec {
ValueStateSpec value_spec = 1;
BagStateSpec bag_spec = 2;
CombiningStateSpec combining_spec = 3;
MapStateSpec map_spec = 4;
SetStateSpec set_spec = 5;
}
}
message ValueStateSpec {
string coder_id = 1;
}
message BagStateSpec {
string element_coder_id = 1;
}
message CombiningStateSpec {
string accumulator_coder_id = 1;
SdkFunctionSpec combine_fn = 2;
}
message MapStateSpec {
string key_coder_id = 1;
string value_coder_id = 2;
}
message SetStateSpec {
string element_coder_id = 1;
}
message TimerSpec {
TimeDomain.Enum time_domain = 1;
}
message IsBounded {
enum Enum {
UNSPECIFIED = 0;
UNBOUNDED = 1;
BOUNDED = 2;
}
}
// The payload for the primitive Read transform.
message ReadPayload {
// (Required) The SdkFunctionSpec of the source for this Read.
SdkFunctionSpec source = 1;
// (Required) Whether the source is bounded or unbounded
IsBounded.Enum is_bounded = 2;
// TODO: full audit of fields required by runners as opposed to SDK harness
}
// The payload for the WindowInto transform.
message WindowIntoPayload {
// (Required) The SdkFunctionSpec of the WindowFn.
SdkFunctionSpec window_fn = 1;
}
// The payload for the special-but-not-primitive Combine transform.
message CombinePayload {
// (Required) The SdkFunctionSpec of the CombineFn.
SdkFunctionSpec combine_fn = 1;
// (Required) A reference to the Coder to use for accumulators of the CombineFn
string accumulator_coder_id = 2;
// (Required) Additional pieces of context the DoFn may require that
// are not otherwise represented in the payload.
// (may force runners to execute the ParDo differently)
repeated Parameter parameters = 3;
// (Optional) A mapping of local input names to side inputs, describing
// the expected access pattern.
map<string, SideInput> side_inputs = 4;
}
// The payload for the test-only primitive TestStream
message TestStreamPayload {
// (Required) the coder for elements in the TestStream events
string coder_id = 1;
repeated Event events = 2;
message Event {
oneof event {
AdvanceWatermark watermark_event = 1;
AdvanceProcessingTime processing_time_event = 2;
AddElements element_event = 3;
}
message AdvanceWatermark {
int64 new_watermark = 1;
}
message AdvanceProcessingTime {
int64 advance_duration = 1;
}
message AddElements {
repeated TimestampedElement elements = 1;
}
}
message TimestampedElement {
bytes encoded_element = 1;
int64 timestamp = 2;
}
}
// The payload for the special-but-not-primitive WriteFiles transform.
message WriteFilesPayload {
// (Required) The SdkFunctionSpec of the FileBasedSink.
SdkFunctionSpec sink = 1;
// (Required) The format function.
SdkFunctionSpec format_function = 2;
bool windowed_writes = 3;
bool runner_determined_sharding = 4;
map<string, SideInput> side_inputs = 5;
}
// A coder, the binary format for serialization and deserialization of data in
// a pipeline.
message Coder {
// (Required) A specification for the coder, as a URN plus parameters. This
// may be a cross-language agreed-upon format, or it may be a "custom coder"
// that can only be used by a particular SDK. It does not include component
// coders, as it is beneficial for these to be comprehensible to a runner
// regardless of whether the binary format is agree-upon.
SdkFunctionSpec spec = 1;
// (Optional) If this coder is parametric, such as ListCoder(VarIntCoder),
// this is a list of the components. In order for encodings to be identical,
// the SdkFunctionSpec and all components must be identical, recursively.
repeated string component_coder_ids = 2;
}
// A windowing strategy describes the window function, triggering, allowed
// lateness, and accumulation mode for a PCollection.
//
// TODO: consider inlining field on PCollection
message WindowingStrategy {
// (Required) The SdkFunctionSpec of the UDF that assigns windows,
// merges windows, and shifts timestamps before they are
// combined according to the OutputTime.
SdkFunctionSpec window_fn = 1;
// (Required) Whether or not the window fn is merging.
//
// This knowledge is required for many optimizations.
MergeStatus.Enum merge_status = 2;
// (Required) The coder for the windows of this PCollection.
string window_coder_id = 3;
// (Required) The trigger to use when grouping this PCollection.
Trigger trigger = 4;
// (Required) The accumulation mode indicates whether new panes are a full
// replacement for prior panes or whether they are deltas to be combined
// with other panes (the combine should correspond to whatever the upstream
// grouping transform is).
AccumulationMode.Enum accumulation_mode = 5;
// (Required) The OutputTime specifies, for a grouping transform, how to
// compute the aggregate timestamp. The window_fn will first possibly shift
// it later, then the OutputTime takes the max, min, or ignores it and takes
// the end of window.
//
// This is actually only for input to grouping transforms, but since they
// may be introduced in runner-specific ways, it is carried along with the
// windowing strategy.
OutputTime.Enum output_time = 6;
// (Required) Indicate when output should be omitted upon window expiration.
ClosingBehavior.Enum closing_behavior = 7;
// (Required) The duration, in milliseconds, beyond the end of a window at
// which the window becomes droppable.
int64 allowed_lateness = 8;
// (Required) Indicate whether empty on-time panes should be omitted.
OnTimeBehavior.Enum OnTimeBehavior = 9;
// (Required) Whether or not the window fn assigns inputs to exactly one window
//
// This knowledge is required for some optimizations
bool assigns_to_one_window = 10;
}
// Whether or not a PCollection's WindowFn is non-merging, merging, or
// merging-but-already-merged, in which case a subsequent GroupByKey is almost
// always going to do something the user does not want
message MergeStatus {
enum Enum {
UNSPECIFIED = 0;
// The WindowFn does not require merging.
// Examples: global window, FixedWindows, SlidingWindows
NON_MERGING = 1;
// The WindowFn is merging and the PCollection has not had merging
// performed.
// Example: Sessions prior to a GroupByKey
NEEDS_MERGE = 2;
// The WindowFn is merging and the PCollection has had merging occur
// already.
// Example: Sessions after a GroupByKey
ALREADY_MERGED = 3;
}
}
// Whether or not subsequent outputs of aggregations should be entire
// replacement values or just the aggregation of inputs received since
// the prior output.
message AccumulationMode {
enum Enum {
UNSPECIFIED = 0;
// The aggregation is discarded when it is output
DISCARDING = 1;
// The aggregation is accumulated across outputs
ACCUMULATING = 2;
}
}
// Controls whether or not an aggregating transform should output data
// when a window expires.
message ClosingBehavior {
enum Enum {
UNSPECIFIED = 0;
// Emit output when a window expires, whether or not there has been
// any new data since the last output.
EMIT_ALWAYS = 1;
// Only emit output when new data has arrives since the last output
EMIT_IF_NONEMPTY = 2;
}
}
// Controls whether or not an aggregating transform should output data
// when an on-time pane is empty.
message OnTimeBehavior {
enum Enum {
UNSPECIFIED = 0;
// Always fire the on-time pane. Even if there is no new data since
// the previous firing, an element will be produced.
FIRE_ALWAYS = 1;
// Only fire the on-time pane if there is new data since the previous firing.
FIRE_IF_NONEMPTY = 2;
}
}
// When a number of windowed, timestamped inputs are aggregated, the timestamp
// for the resulting output.
message OutputTime {
enum Enum {
UNSPECIFIED = 0;
// The output has the timestamp of the end of the window.
END_OF_WINDOW = 1;
// The output has the latest timestamp of the input elements since
// the last output.
LATEST_IN_PANE = 2;
// The output has the earliest timestamp of the input elements since
// the last output.
EARLIEST_IN_PANE = 3;
}
}
// The different time domains in the Beam model.
message TimeDomain {
enum Enum {
UNSPECIFIED = 0;
// Event time is time from the perspective of the data
EVENT_TIME = 1;
// Processing time is time from the perspective of the
// execution of your pipeline
PROCESSING_TIME = 2;
// Synchronized processing time is the minimum of the
// processing time of all pending elements.
//
// The "processing time" of an element refers to
// the local processing time at which it was emitted
SYNCHRONIZED_PROCESSING_TIME = 3;
}
}
// A small DSL for expressing when to emit new aggregations
// from a GroupByKey or CombinePerKey
//
// A trigger is described in terms of when it is _ready_ to permit output.
message Trigger {
// Ready when all subtriggers are ready.
message AfterAll {
repeated Trigger subtriggers = 1;
}
// Ready when any subtrigger is ready.
message AfterAny {
repeated Trigger subtriggers = 1;
}
// Starting with the first subtrigger, ready when the _current_ subtrigger
// is ready. After output, advances the current trigger by one.
message AfterEach {
repeated Trigger subtriggers = 1;
}
// Ready after the input watermark is past the end of the window.
//
// May have implicitly-repeated subtriggers for early and late firings.
// When the end of the window is reached, the trigger transitions between
// the subtriggers.
message AfterEndOfWindow {
// (Optional) A trigger governing output prior to the end of the window.
Trigger early_firings = 1;
// (Optional) A trigger governing output after the end of the window.
Trigger late_firings = 2;
}
// After input arrives, ready when the specified delay has passed.
message AfterProcessingTime {
// (Required) The transforms to apply to an arriving element's timestamp,
// in order
repeated TimestampTransform timestamp_transforms = 1;
}
// Ready whenever upstream processing time has all caught up with
// the arrival time of an input element
message AfterSynchronizedProcessingTime {
}
// The default trigger. Equivalent to Repeat { AfterEndOfWindow } but
// specially denoted to indicate the user did not alter the triggering.
message Default {
}
// Ready whenever the requisite number of input elements have arrived
message ElementCount {
int32 element_count = 1;
}
// Never ready. There will only be an ON_TIME output and a final
// output at window expiration.
message Never {
}
// Always ready. This can also be expressed as ElementCount(1) but
// is more explicit.
message Always {
}
// Ready whenever either of its subtriggers are ready, but finishes output
// when the finally subtrigger fires.
message OrFinally {
// (Required) Trigger governing main output; may fire repeatedly.
Trigger main = 1;
// (Required) Trigger governing termination of output.
Trigger finally = 2;
}
// Ready whenever the subtrigger is ready; resets state when the subtrigger
// completes.
message Repeat {
// (Require) Trigger that is run repeatedly.
Trigger subtrigger = 1;
}
// The full disjoint union of possible triggers.
oneof trigger {
AfterAll after_all = 1;
AfterAny after_any = 2;
AfterEach after_each = 3;
AfterEndOfWindow after_end_of_window = 4;
AfterProcessingTime after_processing_time = 5;
AfterSynchronizedProcessingTime after_synchronized_processing_time = 6;
Always always = 12;
Default default = 7;
ElementCount element_count = 8;
Never never = 9;
OrFinally or_finally = 10;
Repeat repeat = 11;
}
}
// A specification for a transformation on a timestamp.
//
// Primarily used by AfterProcessingTime triggers to transform
// the arrival time of input to a target time for firing.
message TimestampTransform {
oneof timestamp_transform {
Delay delay = 1;
AlignTo align_to = 2;
}
message Delay {
// (Required) The delay, in milliseconds.
int64 delay_millis = 1;
}
message AlignTo {
// (Required) A duration to which delays should be quantized
// in milliseconds.
int64 period = 3;
// (Required) An offset from 0 for the quantization specified by
// alignment_size, in milliseconds
int64 offset = 4;
}
}
// A specification for how to "side input" a PCollection.
message SideInput {
// (Required) URN of the access pattern required by the `view_fn` to present
// the desired SDK-specific interface to a UDF.
//
// This access pattern defines the SDK harness <-> Runner Harness RPC
// interface for accessing a side input.
//
// The only access pattern intended for Beam, because of its superior
// performance possibilities, is "urn:beam:sideinput:multimap" (or some such
// URN)
FunctionSpec access_pattern = 1;
// (Required) The SdkFunctionSpec of the UDF that adapts a particular
// access_pattern to a user-facing view type.
//
// For example, View.asSingleton() may include a `view_fn` that adapts a
// specially-designed multimap to a single value per window.
SdkFunctionSpec view_fn = 2;
// (Required) The SdkFunctionSpec of the UDF that maps a main input window
// to a side input window.
//
// For example, when the main input is in fixed windows of one hour, this
// can specify that the side input should be accessed according to the day
// in which that hour falls.
SdkFunctionSpec window_mapping_fn = 3;
}
// An environment for executing UDFs. Generally an SDK container URL, but
// there can be many for a single SDK, for example to provide dependency
// isolation.
message Environment {
// (Required) The URL of a container
//
// TODO: reconcile with Fn API's DockerContainer structure by
// adding adequate metadata to know how to interpret the container
string url = 1;
}
// A specification of a user defined function.
//
message SdkFunctionSpec {
// (Required) A full specification of this function.
FunctionSpec spec = 1;
// (Required) Reference to an execution environment capable of
// invoking this function.
string environment_id = 2;
}
// A URN along with a parameter object whose schema is determined by the
// URN.
//
// This structure is reused in two distinct, but compatible, ways:
//
// 1. This can be a specification of the function over PCollections
// that a PTransform computes.
// 2. This can be a specification of a user-defined function, possibly
// SDK-specific. (external to this message must be adequate context
// to indicate the environment in which the UDF can be understood).
//
// Though not explicit in this proto, there are two possibilities
// for the relationship of a runner to this specification that
// one should bear in mind:
//
// 1. The runner understands the URN. For example, it might be
// a well-known URN like "urn:beam:transform:Top" or
// "urn:beam:windowfn:FixedWindows" with
// an agreed-upon payload (e.g. a number or duration,
// respectively).
// 2. The runner does not understand the URN. It might be an
// SDK specific URN such as "urn:beam:dofn:javasdk:1.0"
// that indicates to the SDK what the payload is,
// such as a serialized Java DoFn from a particular
// version of the Beam Java SDK. The payload will often
// then be an opaque message such as bytes in a
// language-specific serialization format.
message FunctionSpec {
// (Required) A URN that describes the accompanying payload.
// For any URN that is not recognized (by whomever is inspecting
// it) the parameter payload should be treated as opaque and
// passed as-is.
string urn = 1;
// (Optional) The data specifying any parameters to the URN. If
// the URN does not require any arguments, this may be omitted.
bytes payload = 3;
}
// TODO: transfer javadoc here
message DisplayData {
// (Required) The list of display data.
repeated Item items = 1;
// A complete identifier for a DisplayData.Item
message Identifier {
// (Required) The transform originating this display data.
string transform_id = 1;
// (Optional) The URN indicating the type of the originating transform,
// if there is one.
string transform_urn = 2;
string key = 3;
}
// A single item of display data.
message Item {
// (Required)
Identifier id = 1;
// (Required)
Type.Enum type = 2;
// (Required)
google.protobuf.Any value = 3;
// (Optional)
google.protobuf.Any short_value = 4;
// (Optional)
string label = 5;
// (Optional)
string link_url = 6;
}
message Type {
enum Enum {
UNSPECIFIED = 0;
STRING = 1;
INTEGER = 2;
FLOAT = 3;
BOOLEAN = 4;
TIMESTAMP = 5;
DURATION = 6;
JAVA_CLASS = 7;
}
}
}