blob: ec05ef045b6aad89f1beb6f737fb37c9de0d2c3a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Protocol Buffers describing the Runner API, which is the runner-independent,
* SDK-independent definition of the Beam model.
*/
syntax = "proto3";
package org.apache.beam.model.pipeline.v1;
option go_package = "pipeline_v1";
option java_package = "org.apache.beam.model.pipeline.v1";
option java_outer_classname = "RunnerApi";
import "endpoints.proto";
import "google/protobuf/any.proto";
import "google/protobuf/descriptor.proto";
message BeamConstants {
enum Constants {
// All timestamps in milliseconds since Jan 1, 1970.
MIN_TIMESTAMP_MILLIS = 0 [(beam_constant) = "-9223372036854775"];
MAX_TIMESTAMP_MILLIS = 1 [(beam_constant) = "9223372036854775"];
// The maximum timestamp for the global window.
// Triggers use maxTimestamp to set timers' timestamp. Timers fires when
// the watermark passes their timestamps. So, the timestamp needs to be
// smaller than the MAX_TIMESTAMP_MILLIS.
// One standard day is subtracted from MAX_TIMESTAMP_MILLIS to make sure
// the maxTimestamp is smaller than MAX_TIMESTAMP_MILLIS even after rounding up
// to seconds or minutes. See also GlobalWindow in the Java SDK.
GLOBAL_WINDOW_MAX_TIMESTAMP_MILLIS = 2 [(beam_constant) = "9223371950454775"];
}
}
// A set of mappings from id to message. This is included as an optional field
// on any proto message that may contain references needing resolution.
message Components {
// (Required) A map from pipeline-scoped id to PTransform.
map<string, PTransform> transforms = 1;
// (Required) A map from pipeline-scoped id to PCollection.
map<string, PCollection> pcollections = 2;
// (Required) A map from pipeline-scoped id to WindowingStrategy.
map<string, WindowingStrategy> windowing_strategies = 3;
// (Required) A map from pipeline-scoped id to Coder.
map<string, Coder> coders = 4;
// (Required) A map from pipeline-scoped id to Environment.
map<string, Environment> environments = 5;
}
// A Pipeline is a hierarchical graph of PTransforms, linked
// by PCollections.
//
// This is represented by a number of by-reference maps to nodes,
// PCollections, SDK environments, UDF, etc., for
// supporting compact reuse and arbitrary graph structure.
//
// All of the keys in the maps here are arbitrary strings that are only
// required to be internally consistent within this proto message.
message Pipeline {
// (Required) The coders, UDFs, graph nodes, etc, that make up
// this pipeline.
Components components = 1;
// (Required) The ids of all PTransforms that are not contained within another PTransform.
// These must be in shallow topological order, so that traversing them recursively
// in this order yields a recursively topological traversal.
repeated string root_transform_ids = 2;
// (Optional) Static display data for the pipeline. If there is none,
// it may be omitted.
DisplayData display_data = 3;
}
// An applied PTransform! This does not contain the graph data, but only the
// fields specific to a graph node that is a Runner API transform
// between PCollections.
message PTransform {
// (Required) A unique name for the application node.
//
// Ideally, this should be stable over multiple evolutions of a pipeline
// for the purposes of logging and associating pipeline state with a node,
// etc.
//
// If it is not stable, then the runner decides what will happen. But, most
// importantly, it must always be here and be unique, even if it is
// autogenerated.
string unique_name = 5;
// (Optional) A URN and payload that, together, fully defined the semantics
// of this transform.
//
// If absent, this must be an "anonymous" composite transform.
//
// For primitive transform in the Runner API, this is required, and the
// payloads are well-defined messages. When the URN indicates ParDo it
// is a ParDoPayload, and so on.
//
// TODO: document the standardized URNs and payloads
// TODO: separate standardized payloads into a separate proto file
//
// For some special composite transforms, the payload is also officially
// defined:
//
// - when the URN is "beam:transforms:combine" it is a CombinePayload
//
FunctionSpec spec = 1;
// (Optional) if this node is a composite, a list of the ids of
// transforms that it contains.
repeated string subtransforms = 2;
// (Required) A map from local names of inputs (unique only with this map, and
// likely embedded in the transform payload and serialized user code) to
// PCollection ids.
//
// The payload for this transform may clarify the relationship of these
// inputs. For example:
//
// - for a Flatten transform they are merged
// - for a ParDo transform, some may be side inputs
//
// All inputs are recorded here so that the topological ordering of
// the graph is consistent whether or not the payload is understood.
//
map<string, string> inputs = 3;
// (Required) A map from local names of outputs (unique only within this map,
// and likely embedded in the transform payload and serialized user code)
// to PCollection ids.
//
// The URN or payload for this transform node may clarify the type and
// relationship of these outputs. For example:
//
// - for a ParDo transform, these are tags on PCollections, which will be
// embedded in the DoFn.
//
map<string, string> outputs = 4;
// (Optional) Static display data for this PTransform application. If
// there is none, or it is not relevant (such as use by the Fn API)
// then it may be omitted.
DisplayData display_data = 6;
}
message StandardPTransforms {
enum Primitives {
// Represents Beam's parallel do operation.
// Payload: ParDoPayload.
// TODO(BEAM-3595): Change this to beam:transform:pardo:v1.
PAR_DO = 0 [(beam_urn) = "beam:transform:pardo:v1"];
// Represents Beam's flatten operation.
// Payload: None.
FLATTEN = 1 [(beam_urn) = "beam:transform:flatten:v1"];
// Represents Beam's group-by-key operation.
// Payload: None
GROUP_BY_KEY = 2 [(beam_urn) = "beam:transform:group_by_key:v1"];
// Represents the operation generating a single empty element.
IMPULSE = 3 [(beam_urn) = "beam:transform:impulse:v1"];
// Represents the Window.into() operation.
// Payload: WindowIntoPayload.
ASSIGN_WINDOWS = 4 [(beam_urn) = "beam:transform:window_into:v1"];
// Represents the TestStream.
// Payload: TestStreamPayload
TEST_STREAM = 5 [(beam_urn) = "beam:transform:teststream:v1"];
// Represents mapping of main input window onto side input window.
//
// Side input window mapping function:
// Input: KV<nonce, MainInputWindow>
// Output: KV<nonce, SideInputWindow>
//
// For each main input window, the side input window is returned. The
// nonce is used by a runner to associate each input with its output.
// The nonce is represented as an opaque set of bytes.
//
// Payload: WindowMappingFn from SideInputSpec.
MAP_WINDOWS = 6 [(beam_urn) = "beam:transform:map_windows:v1"];
// Used to merge windows during a GroupByKey.
//
// Window merging function:
// Input: KV<nonce, iterable<OriginalWindow>>
// Output: KV<nonce, KV<iterable<UnmergedOriginalWindow>, iterable<KV<MergedWindow, iterable<ConsumedOriginalWindow>>>>
//
// For each set of original windows, a list of all unmerged windows is
// output alongside a map of merged window to set of consumed windows.
// All original windows must be contained in either the unmerged original
// window set or one of the consumed original window sets. Each original
// window can only be part of one output set. The nonce is used by a runner
// to associate each input with its output. The nonce is represented as an
// opaque set of bytes.
//
// Payload: WindowFn from WindowingStrategy.
MERGE_WINDOWS = 7 [(beam_urn) = "beam:transform:merge_windows:v1"];
}
enum DeprecatedPrimitives {
// Represents the operation to read a Bounded or Unbounded source.
// Payload: ReadPayload.
READ = 0 [(beam_urn) = "beam:transform:read:v1"];
// Runners should move away from translating `CreatePCollectionView` and treat this as
// part of the translation for a `ParDo` side input.
CREATE_VIEW = 1 [(beam_urn) = "beam:transform:create_view:v1"];
}
enum Composites {
// Represents the Combine.perKey() operation.
// If this is produced by an SDK, it is assumed that the SDK understands
// each of CombineComponents.
// Payload: CombinePayload
COMBINE_PER_KEY = 0 [(beam_urn) = "beam:transform:combine_per_key:v1"];
// Represents the Combine.globally() operation.
// If this is produced by an SDK, it is assumed that the SDK understands
// each of CombineComponents.
// Payload: CombinePayload
COMBINE_GLOBALLY = 1 [(beam_urn) = "beam:transform:combine_globally:v1"];
// Represents the Reshuffle operation.
RESHUFFLE = 2 [(beam_urn) = "beam:transform:reshuffle:v1"];
// Less well-known. Payload: WriteFilesPayload.
WRITE_FILES = 3 [(beam_urn) = "beam:transform:write_files:v1"];
}
// Payload for all of these: CombinePayload
enum CombineComponents {
// Represents the Pre-Combine part of a lifted Combine Per Key, as described
// in the following document:
// https://s.apache.org/beam-runner-api-combine-model#heading=h.ta0g6ase8z07
// Payload: CombinePayload
COMBINE_PER_KEY_PRECOMBINE = 0 [(beam_urn) = "beam:transform:combine_per_key_precombine:v1"];
// Represents the Merge Accumulators part of a lifted Combine Per Key, as
// described in the following document:
// https://s.apache.org/beam-runner-api-combine-model#heading=h.jco9rvatld5m
// Payload: CombinePayload
COMBINE_PER_KEY_MERGE_ACCUMULATORS = 1 [(beam_urn) = "beam:transform:combine_per_key_merge_accumulators:v1"];
// Represents the Extract Outputs part of a lifted Combine Per Key, as
// described in the following document:
// https://s.apache.org/beam-runner-api-combine-model#heading=h.i9i6p8gtl6ku
// Payload: CombinePayload
COMBINE_PER_KEY_EXTRACT_OUTPUTS = 2 [(beam_urn) = "beam:transform:combine_per_key_extract_outputs:v1"];
// Represents the Combine Grouped Values transform, as described in the
// following document:
// https://s.apache.org/beam-runner-api-combine-model#heading=h.aj86ew4v1wk
// Payload: CombinePayload
COMBINE_GROUPED_VALUES = 3 [(beam_urn) = "beam:transform:combine_grouped_values:v1"];
}
// Payload for all of these: ParDoPayload containing the user's SDF
enum SplittableParDoComponents {
// Pairs the input element with its initial restriction.
// Input: element; output: KV(element, restriction).
PAIR_WITH_RESTRICTION = 0 [(beam_urn) = "beam:transform:sdf_pair_with_restriction:v1"];
// Splits the restriction inside an element/restriction pair.
// Input: KV(element, restriction); output: KV(element, restriction).
SPLIT_RESTRICTION = 1 [(beam_urn) = "beam:transform:sdf_split_restriction:v1"];
// Applies the DoFn to every element/restriction pair in a uniquely keyed
// collection, in a splittable fashion.
// Input: KV(bytes, KV(element, restriction)); output: DoFn's output.
// The first "bytes" is an opaque unique key using the standard bytes coder.
// Typically a runner would rewrite this into a runner-specific grouping
// operation supporting state and timers, followed by PROCESS_ELEMENTS,
// with some runner-specific glue code in between.
PROCESS_KEYED_ELEMENTS = 2 [(beam_urn) = "beam:transform:sdf_process_keyed_elements:v1"];
// Like PROCESS_KEYED_ELEMENTS, but without the unique key - just elements
// and restrictions.
// Input: KV(element, restriction); output: DoFn's output.
PROCESS_ELEMENTS = 3 [(beam_urn) = "beam:transform:sdf_process_elements:v1"];
// Splits the restriction of each element/restriction pair and returns the
// resulting splits, with a corresponding floating point size estimations
// for each.
// A reasonable value for size is the number of bytes expected to be
// produced by this (element, restriction) pair.
// Input: KV(element, restriction)
// Output: KV(KV(element, restriction), size))
SPLIT_AND_SIZE_RESTRICTIONS = 4 [(beam_urn) = "beam:transform:sdf_split_and_size_restrictions:v1"];
// Like PROCESS_ELEMENTS, but accepts the sized output produced by
// SPLIT_RESTRICTION_WITH_SIZING.
// Input: KV(KV(element, restriction), size); output: DoFn's output.
PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS = 5 [(beam_urn) = "beam:transform:sdf_process_sized_element_and_restrictions:v1"];
}
}
message StandardSideInputTypes {
enum Enum {
ITERABLE = 0 [(beam_urn) = "beam:side_input:iterable:v1"];
MULTIMAP = 1 [(beam_urn) = "beam:side_input:multimap:v1"];
}
}
// A PCollection!
message PCollection {
// (Required) A unique name for the PCollection.
//
// Ideally, this should be stable over multiple evolutions of a pipeline
// for the purposes of logging and associating pipeline state with a node,
// etc.
//
// If it is not stable, then the runner decides what will happen. But, most
// importantly, it must always be here, even if it is autogenerated.
string unique_name = 1;
// (Required) The id of the Coder for this PCollection.
string coder_id = 2;
// (Required) Whether this PCollection is bounded or unbounded
IsBounded.Enum is_bounded = 3;
// (Required) The id of the windowing strategy for this PCollection.
string windowing_strategy_id = 4;
// (Optional) Static display data for this PTransform application. If
// there is none, or it is not relevant (such as use by the Fn API)
// then it may be omitted.
DisplayData display_data = 5;
}
// The payload for the primitive ParDo transform.
message ParDoPayload {
// (Required) The SdkFunctionSpec of the DoFn.
SdkFunctionSpec do_fn = 1;
// (Required) Additional pieces of context the DoFn may require that
// are not otherwise represented in the payload.
// (may force runners to execute the ParDo differently)
repeated Parameter parameters = 2;
// (Optional) A mapping of local input names to side inputs, describing
// the expected access pattern.
map<string, SideInput> side_inputs = 3;
// (Optional) A mapping of local state names to state specifications.
map<string, StateSpec> state_specs = 4;
// (Optional) A mapping of local timer names to timer specifications.
map<string, TimerSpec> timer_specs = 5;
// Whether the DoFn is splittable
bool splittable = 6;
// (Required if splittable == true) Id of the restriction coder.
string restriction_coder_id = 7;
// (Optional) Only set when this ParDo can request bundle finalization.
bool requests_finalization = 8;
}
// Parameters that a UDF might require.
//
// The details of how a runner sends these parameters to the SDK harness
// are the subject of the Fn API.
//
// The details of how an SDK harness delivers them to the UDF is entirely
// up to the SDK. (for some SDKs there may be parameters that are not
// represented here if the runner doesn't need to do anything)
//
// Here, the parameters are simply indicators to the runner that they
// need to run the function a particular way.
//
// TODO: the evolution of the Fn API will influence what needs explicit
// representation here
message Parameter {
Type.Enum type = 1;
message Type {
enum Enum {
UNSPECIFIED = 0;
WINDOW = 1;
PIPELINE_OPTIONS = 2;
RESTRICTION_TRACKER = 3;
}
}
}
message StateSpec {
oneof spec {
ReadModifyWriteStateSpec read_modify_write_spec = 1;
BagStateSpec bag_spec = 2;
CombiningStateSpec combining_spec = 3;
MapStateSpec map_spec = 4;
SetStateSpec set_spec = 5;
}
}
message ReadModifyWriteStateSpec {
string coder_id = 1;
}
message BagStateSpec {
string element_coder_id = 1;
}
message CombiningStateSpec {
string accumulator_coder_id = 1;
SdkFunctionSpec combine_fn = 2;
}
message MapStateSpec {
string key_coder_id = 1;
string value_coder_id = 2;
}
message SetStateSpec {
string element_coder_id = 1;
}
message TimerSpec {
TimeDomain.Enum time_domain = 1;
string timer_coder_id = 2;
}
message IsBounded {
enum Enum {
UNSPECIFIED = 0;
UNBOUNDED = 1;
BOUNDED = 2;
}
}
// The payload for the primitive Read transform.
message ReadPayload {
// (Required) The SdkFunctionSpec of the source for this Read.
SdkFunctionSpec source = 1;
// (Required) Whether the source is bounded or unbounded
IsBounded.Enum is_bounded = 2;
// TODO: full audit of fields required by runners as opposed to SDK harness
}
// The payload for the WindowInto transform.
message WindowIntoPayload {
// (Required) The SdkFunctionSpec of the WindowFn.
SdkFunctionSpec window_fn = 1;
}
// The payload for the special-but-not-primitive Combine transform.
message CombinePayload {
// (Required) The SdkFunctionSpec of the CombineFn.
SdkFunctionSpec combine_fn = 1;
// (Required) A reference to the Coder to use for accumulators of the CombineFn
string accumulator_coder_id = 2;
}
// The payload for the test-only primitive TestStream
message TestStreamPayload {
// (Required) the coder for elements in the TestStream events
string coder_id = 1;
repeated Event events = 2;
message Event {
oneof event {
AdvanceWatermark watermark_event = 1;
AdvanceProcessingTime processing_time_event = 2;
AddElements element_event = 3;
}
message AdvanceWatermark {
int64 new_watermark = 1;
}
message AdvanceProcessingTime {
int64 advance_duration = 1;
}
message AddElements {
repeated TimestampedElement elements = 1;
}
}
message TimestampedElement {
bytes encoded_element = 1;
int64 timestamp = 2;
}
}
// The payload for the special-but-not-primitive WriteFiles transform.
message WriteFilesPayload {
// (Required) The SdkFunctionSpec of the FileBasedSink.
SdkFunctionSpec sink = 1;
// (Required) The format function.
SdkFunctionSpec format_function = 2;
bool windowed_writes = 3;
bool runner_determined_sharding = 4;
map<string, SideInput> side_inputs = 5;
}
// A coder, the binary format for serialization and deserialization of data in
// a pipeline.
message Coder {
// (Required) A specification for the coder, as a URN plus parameters. This
// may be a cross-language agreed-upon format, or it may be a "custom coder"
// that can only be used by a particular SDK. It does not include component
// coders, as it is beneficial for these to be comprehensible to a runner
// regardless of whether the binary format is agreed-upon.
FunctionSpec spec = 1;
// (Optional) If this coder is parametric, such as ListCoder(VarIntCoder),
// this is a list of the components. In order for encodings to be identical,
// the SdkFunctionSpec and all components must be identical, recursively.
repeated string component_coder_ids = 2;
}
message StandardCoders {
enum Enum {
// Components: None
BYTES = 0 [(beam_urn) = "beam:coder:bytes:v1"];
// Components: None
STRING_UTF8 = 10 [(beam_urn) = "beam:coder:string_utf8:v1"];
// Components: The key and value coder, in that order.
KV = 1 [(beam_urn) = "beam:coder:kv:v1"];
// Components: None
BOOL = 12 [(beam_urn) = "beam:coder:bool:v1"];
// Variable length Encodes a 64-bit integer.
// Components: None
VARINT = 2 [(beam_urn) = "beam:coder:varint:v1"];
// Encodes the floating point value as a big-endian 64-bit integer
// according to the IEEE 754 double format bit layout.
// Components: None
DOUBLE = 11 [(beam_urn) = "beam:coder:double:v1"];
// Encodes an iterable of elements.
//
// The encoding for an iterable [e1...eN] of known length N is
//
// fixed32(N)
// encode(e1) encode(e2) encode(e3) ... encode(eN)
//
// If the length is unknown, it is batched up into groups of size b1..bM
// and encoded as
//
// fixed32(-1)
// varInt64(b1) encode(e1) encode(e2) ... encode(e_b1)
// varInt64(b2) encode(e_(b1+1)) encode(e_(b1+2)) ... encode(e_(b1+b2))
// ...
// varInt64(bM) encode(e_(N-bM+1)) encode(e_(N-bM+2)) ... encode(eN)
// varInt64(0)
//
// Components: Coder for a single element.
ITERABLE = 3 [(beam_urn) = "beam:coder:iterable:v1"];
// Encodes a timer containing a timestamp and a user specified payload.
// The encoding is represented as: timestamp payload
// timestamp - a big endian 8 byte integer representing millis-since-epoch.
// The encoded representation is shifted so that the byte representation of
// negative values are lexicographically ordered before the byte representation
// of positive values. This is typically done by subtracting -9223372036854775808
// from the value and encoding it as a signed big endian integer. Example values:
//
// -9223372036854775808: 00 00 00 00 00 00 00 00
// -255: 7F FF FF FF FF FF FF 01
// -1: 7F FF FF FF FF FF FF FF
// 0: 80 00 00 00 00 00 00 00
// 1: 80 00 00 00 00 00 00 01
// 256: 80 00 00 00 00 00 01 00
// 9223372036854775807: FF FF FF FF FF FF FF FF
// payload - user defined data, uses the component coder
// Components: Coder for the payload.
TIMER = 4 [(beam_urn) = "beam:coder:timer:v1"];
/*
* The following coders are typically not specified manually by the user,
* but are used at runtime and must be supported by every SDK.
*/
// Components: None
INTERVAL_WINDOW = 5 [(beam_urn) = "beam:coder:interval_window:v1"];
// Components: The coder to attach a length prefix to
LENGTH_PREFIX = 6 [(beam_urn) = "beam:coder:length_prefix:v1"];
// Components: None
GLOBAL_WINDOW = 7 [(beam_urn) = "beam:coder:global_window:v1"];
// Encodes an element, the window the value is in, the timestamp of the element, and the pane
// of the element
// Components: The element coder and the window coder, in that order
WINDOWED_VALUE = 8 [(beam_urn) = "beam:coder:windowed_value:v1"];
// Encodes an iterable of elements, some of which may be stored elsewhere.
//
// The encoding for a state-backed iterable is the same as that for
// an iterable, but the final varInt64(0) terminating the set of batches
// may instead be replaced by
//
// varInt64(-1)
// varInt64(len(token))
// token
//
// where token is an opaque byte string that can be used to fetch the
// remainder of the iterable (e.g. over the state API).
//
// Components: Coder for a single element.
// Experimental.
STATE_BACKED_ITERABLE = 9 [(beam_urn) = "beam:coder:state_backed_iterable:v1"];
}
}
// A windowing strategy describes the window function, triggering, allowed
// lateness, and accumulation mode for a PCollection.
//
// TODO: consider inlining field on PCollection
message WindowingStrategy {
// (Required) The SdkFunctionSpec of the UDF that assigns windows,
// merges windows, and shifts timestamps before they are
// combined according to the OutputTime.
SdkFunctionSpec window_fn = 1;
// (Required) Whether or not the window fn is merging.
//
// This knowledge is required for many optimizations.
MergeStatus.Enum merge_status = 2;
// (Required) The coder for the windows of this PCollection.
string window_coder_id = 3;
// (Required) The trigger to use when grouping this PCollection.
Trigger trigger = 4;
// (Required) The accumulation mode indicates whether new panes are a full
// replacement for prior panes or whether they are deltas to be combined
// with other panes (the combine should correspond to whatever the upstream
// grouping transform is).
AccumulationMode.Enum accumulation_mode = 5;
// (Required) The OutputTime specifies, for a grouping transform, how to
// compute the aggregate timestamp. The window_fn will first possibly shift
// it later, then the OutputTime takes the max, min, or ignores it and takes
// the end of window.
//
// This is actually only for input to grouping transforms, but since they
// may be introduced in runner-specific ways, it is carried along with the
// windowing strategy.
OutputTime.Enum output_time = 6;
// (Required) Indicate when output should be omitted upon window expiration.
ClosingBehavior.Enum closing_behavior = 7;
// (Required) The duration, in milliseconds, beyond the end of a window at
// which the window becomes droppable.
int64 allowed_lateness = 8;
// (Required) Indicate whether empty on-time panes should be omitted.
OnTimeBehavior.Enum OnTimeBehavior = 9;
// (Required) Whether or not the window fn assigns inputs to exactly one window
//
// This knowledge is required for some optimizations
bool assigns_to_one_window = 10;
}
// Whether or not a PCollection's WindowFn is non-merging, merging, or
// merging-but-already-merged, in which case a subsequent GroupByKey is almost
// always going to do something the user does not want
message MergeStatus {
enum Enum {
UNSPECIFIED = 0;
// The WindowFn does not require merging.
// Examples: global window, FixedWindows, SlidingWindows
NON_MERGING = 1;
// The WindowFn is merging and the PCollection has not had merging
// performed.
// Example: Sessions prior to a GroupByKey
NEEDS_MERGE = 2;
// The WindowFn is merging and the PCollection has had merging occur
// already.
// Example: Sessions after a GroupByKey
ALREADY_MERGED = 3;
}
}
// Whether or not subsequent outputs of aggregations should be entire
// replacement values or just the aggregation of inputs received since
// the prior output.
message AccumulationMode {
enum Enum {
UNSPECIFIED = 0;
// The aggregation is discarded when it is output
DISCARDING = 1;
// The aggregation is accumulated across outputs
ACCUMULATING = 2;
// The aggregation emits retractions when it is output
RETRACTING = 3;
}
}
// Controls whether or not an aggregating transform should output data
// when a window expires.
message ClosingBehavior {
enum Enum {
UNSPECIFIED = 0;
// Emit output when a window expires, whether or not there has been
// any new data since the last output.
EMIT_ALWAYS = 1;
// Only emit output when new data has arrives since the last output
EMIT_IF_NONEMPTY = 2;
}
}
// Controls whether or not an aggregating transform should output data
// when an on-time pane is empty.
message OnTimeBehavior {
enum Enum {
UNSPECIFIED = 0;
// Always fire the on-time pane. Even if there is no new data since
// the previous firing, an element will be produced.
FIRE_ALWAYS = 1;
// Only fire the on-time pane if there is new data since the previous firing.
FIRE_IF_NONEMPTY = 2;
}
}
// When a number of windowed, timestamped inputs are aggregated, the timestamp
// for the resulting output.
message OutputTime {
enum Enum {
UNSPECIFIED = 0;
// The output has the timestamp of the end of the window.
END_OF_WINDOW = 1;
// The output has the latest timestamp of the input elements since
// the last output.
LATEST_IN_PANE = 2;
// The output has the earliest timestamp of the input elements since
// the last output.
EARLIEST_IN_PANE = 3;
}
}
// The different time domains in the Beam model.
message TimeDomain {
enum Enum {
UNSPECIFIED = 0;
// Event time is time from the perspective of the data
EVENT_TIME = 1;
// Processing time is time from the perspective of the
// execution of your pipeline
PROCESSING_TIME = 2;
// Synchronized processing time is the minimum of the
// processing time of all pending elements.
//
// The "processing time" of an element refers to
// the local processing time at which it was emitted
SYNCHRONIZED_PROCESSING_TIME = 3;
}
}
// A small DSL for expressing when to emit new aggregations
// from a GroupByKey or CombinePerKey
//
// A trigger is described in terms of when it is _ready_ to permit output.
message Trigger {
// Ready when all subtriggers are ready.
message AfterAll {
repeated Trigger subtriggers = 1;
}
// Ready when any subtrigger is ready.
message AfterAny {
repeated Trigger subtriggers = 1;
}
// Starting with the first subtrigger, ready when the _current_ subtrigger
// is ready. After output, advances the current trigger by one.
message AfterEach {
repeated Trigger subtriggers = 1;
}
// Ready after the input watermark is past the end of the window.
//
// May have implicitly-repeated subtriggers for early and late firings.
// When the end of the window is reached, the trigger transitions between
// the subtriggers.
message AfterEndOfWindow {
// (Optional) A trigger governing output prior to the end of the window.
Trigger early_firings = 1;
// (Optional) A trigger governing output after the end of the window.
Trigger late_firings = 2;
}
// After input arrives, ready when the specified delay has passed.
message AfterProcessingTime {
// (Required) The transforms to apply to an arriving element's timestamp,
// in order
repeated TimestampTransform timestamp_transforms = 1;
}
// Ready whenever upstream processing time has all caught up with
// the arrival time of an input element
message AfterSynchronizedProcessingTime {
}
// The default trigger. Equivalent to Repeat { AfterEndOfWindow } but
// specially denoted to indicate the user did not alter the triggering.
message Default {
}
// Ready whenever the requisite number of input elements have arrived
message ElementCount {
int32 element_count = 1;
}
// Never ready. There will only be an ON_TIME output and a final
// output at window expiration.
message Never {
}
// Always ready. This can also be expressed as ElementCount(1) but
// is more explicit.
message Always {
}
// Ready whenever either of its subtriggers are ready, but finishes output
// when the finally subtrigger fires.
message OrFinally {
// (Required) Trigger governing main output; may fire repeatedly.
Trigger main = 1;
// (Required) Trigger governing termination of output.
Trigger finally = 2;
}
// Ready whenever the subtrigger is ready; resets state when the subtrigger
// completes.
message Repeat {
// (Require) Trigger that is run repeatedly.
Trigger subtrigger = 1;
}
// The full disjoint union of possible triggers.
oneof trigger {
AfterAll after_all = 1;
AfterAny after_any = 2;
AfterEach after_each = 3;
AfterEndOfWindow after_end_of_window = 4;
AfterProcessingTime after_processing_time = 5;
AfterSynchronizedProcessingTime after_synchronized_processing_time = 6;
Always always = 12;
Default default = 7;
ElementCount element_count = 8;
Never never = 9;
OrFinally or_finally = 10;
Repeat repeat = 11;
}
}
// A specification for a transformation on a timestamp.
//
// Primarily used by AfterProcessingTime triggers to transform
// the arrival time of input to a target time for firing.
message TimestampTransform {
oneof timestamp_transform {
Delay delay = 1;
AlignTo align_to = 2;
}
message Delay {
// (Required) The delay, in milliseconds.
int64 delay_millis = 1;
}
message AlignTo {
// (Required) A duration to which delays should be quantized
// in milliseconds.
int64 period = 3;
// (Required) An offset from 0 for the quantization specified by
// alignment_size, in milliseconds
int64 offset = 4;
}
}
// A specification for how to "side input" a PCollection.
message SideInput {
// (Required) URN of the access pattern required by the `view_fn` to present
// the desired SDK-specific interface to a UDF.
//
// This access pattern defines the SDK harness <-> Runner Harness RPC
// interface for accessing a side input.
//
// The only access pattern intended for Beam, because of its superior
// performance possibilities, is "beam:sideinput:multimap" (or some such
// URN)
FunctionSpec access_pattern = 1;
// (Required) The SdkFunctionSpec of the UDF that adapts a particular
// access_pattern to a user-facing view type.
//
// For example, View.asSingleton() may include a `view_fn` that adapts a
// specially-designed multimap to a single value per window.
SdkFunctionSpec view_fn = 2;
// (Required) The SdkFunctionSpec of the UDF that maps a main input window
// to a side input window.
//
// For example, when the main input is in fixed windows of one hour, this
// can specify that the side input should be accessed according to the day
// in which that hour falls.
SdkFunctionSpec window_mapping_fn = 3;
}
// An environment for executing UDFs. By default, an SDK container URL, but
// can also be a process forked by a command, or an externally managed process.
message Environment {
// (Required) The URN of the payload
string urn = 2;
// (Optional) The data specifying any parameters to the URN. If
// the URN does not require any arguments, this may be omitted.
bytes payload = 3;
reserved 1;
}
message StandardEnvironments {
enum Environments {
DOCKER = 0 [(beam_urn) = "beam:env:docker:v1"]; // A managed docker container to run user code.
PROCESS = 1 [(beam_urn) = "beam:env:process:v1"]; // A managed native process to run user code.
EXTERNAL = 2 [(beam_urn) = "beam:env:external:v1"]; // An external non managed process to run user code.
}
}
// The payload of a Docker image
message DockerPayload {
string container_image = 1; // implicitly linux_amd64.
}
message ProcessPayload {
string os = 1; // "linux", "darwin", ..
string arch = 2; // "amd64", ..
string command = 3; // process to execute
map<string, string> env = 4; // Environment variables
}
message ExternalPayload {
ApiServiceDescriptor endpoint = 1;
map<string, string> params = 2; // Arbitrary extra parameters to pass
}
// A specification of a user defined function.
//
message SdkFunctionSpec {
// (Required) A full specification of this function.
FunctionSpec spec = 1;
// (Required) Reference to an execution environment capable of
// invoking this function.
string environment_id = 2;
}
extend google.protobuf.EnumValueOptions {
// An extension to be used for specifying the standard URN of various
// pipeline entities, e.g. transforms, functions, coders etc.
// Code should refer to the URNs of those entities by extracting
// it from the (beam_urn) extension, rather than by hard-coding
// the URN.
//
// The recommended pattern for declaring it is (exemplified by coders):
//
// message StandardCoders {
// enum Enum {
// BYTES = 0 [(beam_urn) = "beam:coder:bytes:v1"];
// ...
// }
// }
//
// If there are multiple categories of entities of this type, use the
// following pattern (exemplified by PTransforms):
//
// message StandardPTransforms {
// enum Primitives {
// ...
// }
// enum Composites {
// ...
// }
// }
string beam_urn = 185324356;
// A value to store other constants
string beam_constant = 185324357;
}
// A URN along with a parameter object whose schema is determined by the
// URN.
//
// This structure is reused in two distinct, but compatible, ways:
//
// 1. This can be a specification of the function over PCollections
// that a PTransform computes.
// 2. This can be a specification of a user-defined function, possibly
// SDK-specific. (external to this message must be adequate context
// to indicate the environment in which the UDF can be understood).
//
// Though not explicit in this proto, there are two possibilities
// for the relationship of a runner to this specification that
// one should bear in mind:
//
// 1. The runner understands the URN. For example, it might be
// a well-known URN like "beam:transform:Top" or
// "beam:windowfn:FixedWindows" with
// an agreed-upon payload (e.g. a number or duration,
// respectively).
// 2. The runner does not understand the URN. It might be an
// SDK specific URN such as "beam:dofn:javasdk:1.0"
// that indicates to the SDK what the payload is,
// such as a serialized Java DoFn from a particular
// version of the Beam Java SDK. The payload will often
// then be an opaque message such as bytes in a
// language-specific serialization format.
message FunctionSpec {
// (Required) A URN that describes the accompanying payload.
// For any URN that is not recognized (by whomever is inspecting
// it) the parameter payload should be treated as opaque and
// passed as-is.
string urn = 1;
// (Optional) The data specifying any parameters to the URN. If
// the URN does not require any arguments, this may be omitted.
bytes payload = 3;
}
// TODO: transfer javadoc here
message DisplayData {
// (Required) The list of display data.
repeated Item items = 1;
// A complete identifier for a DisplayData.Item
message Identifier {
// (Required) The transform originating this display data.
string transform_id = 1;
// (Optional) The URN indicating the type of the originating transform,
// if there is one.
string transform_urn = 2;
string key = 3;
}
// A single item of display data.
message Item {
// (Required)
Identifier id = 1;
// (Required)
Type.Enum type = 2;
// (Required)
google.protobuf.Any value = 3;
// (Optional)
google.protobuf.Any short_value = 4;
// (Optional)
string label = 5;
// (Optional)
string link_url = 6;
}
message Type {
enum Enum {
UNSPECIFIED = 0;
STRING = 1;
INTEGER = 2;
FLOAT = 3;
BOOLEAN = 4;
TIMESTAMP = 5;
DURATION = 6;
JAVA_CLASS = 7;
}
}
}
// The following transforms are not part of the RunnerApi specification,
// but may be useful for graph construction and manipulation.
// A disjoint union of all the things that may contain references
// that require Components to resolve.
message MessageWithComponents {
// (Optional) The by-reference components of the root message,
// enabling a standalone message.
//
// If this is absent, it is expected that there are no
// references.
Components components = 1;
// (Required) The root message that may contain pointers
// that should be resolved by looking inside components.
oneof root {
Coder coder = 2;
CombinePayload combine_payload = 3;
SdkFunctionSpec sdk_function_spec = 4;
ParDoPayload par_do_payload = 6;
PTransform ptransform = 7;
PCollection pcollection = 8;
ReadPayload read_payload = 9;
SideInput side_input = 11;
WindowIntoPayload window_into_payload = 12;
WindowingStrategy windowing_strategy = 13;
FunctionSpec function_spec = 14;
}
}
// The payload for an executable stage. This will eventually be passed to an SDK in the form of a
// ProcessBundleDescriptor.
message ExecutableStagePayload {
// (Required) Environment in which this stage executes.
//
// We use an environment rather than environment id
// because ExecutableStages use environments directly. This may change in the future.
Environment environment = 1;
// (Required) Input PCollection id. This must be present as a value in the inputs of any
// PTransform the ExecutableStagePayload is the payload of.
string input = 2;
// The side inputs required for this executable stage. Each side input of each PTransform within
// this ExecutableStagePayload must be represented within this field.
repeated SideInputId side_inputs = 3;
// PTransform ids contained within this executable stage. This must contain at least one
// PTransform id.
repeated string transforms = 4;
// Output PCollection ids. This must be equal to the values of the outputs of any
// PTransform the ExecutableStagePayload is the payload of.
repeated string outputs = 5;
// (Required) The components for the Executable Stage. This must contain all of the Transforms
// in transforms, and the closure of all of the components they recognize.
Components components = 6;
// The user states required for this executable stage. Each user state of each PTransform within
// this ExecutableStagePayload must be represented within this field.
repeated UserStateId user_states = 7;
// The timers required for this executable stage. Each timer of each PTransform within
// this ExecutableStagePayload must be represented within this field.
repeated TimerId timers = 8;
// A reference to a side input. Side inputs are uniquely identified by PTransform id and
// local name.
message SideInputId {
// (Required) The id of the PTransform that references this side input.
string transform_id = 1;
// (Required) The local name of this side input from the PTransform that references it.
string local_name = 2;
}
// A reference to user state. User states are uniquely identified by PTransform id and
// local name.
message UserStateId {
// (Required) The id of the PTransform that references this user state.
string transform_id = 1;
// (Required) The local name of this user state for the PTransform that references it.
string local_name = 2;
}
// A reference to a timer. Timers are uniquely identified by PTransform id and
// local name.
message TimerId {
// (Required) The id of the PTransform that references this timer.
string transform_id = 1;
// (Required) The local name of this timer for the PTransform that references it.
string local_name = 2;
}
}