blob: 0298b3fb879daefb8685c5395ce748f3f56a9580 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Protocol Buffers describing the Runner API, which is the runner-independent,
* SDK-independent definition of the Beam model.
*/
syntax = "proto3";
package org.apache.beam.model.pipeline.v1;
option go_package = "pipeline_v1";
option java_package = "org.apache.beam.model.pipeline.v1";
option java_outer_classname = "RunnerApi";
import "endpoints.proto";
import "google/protobuf/any.proto";
import "google/protobuf/descriptor.proto";
import "google/protobuf/timestamp.proto";
message BeamConstants {
enum Constants {
// All timestamps in milliseconds since Jan 1, 1970.
// All timestamps of elements or window boundaries must be within
// the interval [MIN_TIMESTAMP_MILLIS, MAX_TIMESTAMP_MILLIS].
// The smallest representable timestamp of an element or a window boundary.
MIN_TIMESTAMP_MILLIS = 0 [(beam_constant) = "-9223372036854775"];
// The largest representable timestamp of an element or a window boundary.
MAX_TIMESTAMP_MILLIS = 1 [(beam_constant) = "9223372036854775"];
// The maximum timestamp for the global window.
// Triggers use max timestamp to set timers' timestamp. Timers fire when
// the watermark passes their timestamps. So, the timestamp needs to be
// smaller than the MAX_TIMESTAMP_MILLIS.
// One standard day is subtracted from MAX_TIMESTAMP_MILLIS to make sure
// the max timestamp is smaller than MAX_TIMESTAMP_MILLIS even after rounding up
// to seconds or minutes.
GLOBAL_WINDOW_MAX_TIMESTAMP_MILLIS = 2 [(beam_constant) = "9223371950454775"];
}
}
// A set of mappings from id to message. This is included as an optional field
// on any proto message that may contain references needing resolution.
message Components {
// (Required) A map from pipeline-scoped id to PTransform.
map<string, PTransform> transforms = 1;
// (Required) A map from pipeline-scoped id to PCollection.
map<string, PCollection> pcollections = 2;
// (Required) A map from pipeline-scoped id to WindowingStrategy.
map<string, WindowingStrategy> windowing_strategies = 3;
// (Required) A map from pipeline-scoped id to Coder.
map<string, Coder> coders = 4;
// (Required) A map from pipeline-scoped id to Environment.
map<string, Environment> environments = 5;
}
// A Pipeline is a hierarchical graph of PTransforms, linked
// by PCollections. A typical graph may look like:
//
// Impulse -> PCollection -> ParDo -> PCollection -> GroupByKey -> ...
// \> PCollection -> ParDo -> ...
// \> ParDo -> ...
// Impulse -> PCollection -> ParDo -> PCollection -> ...
//
// This is represented by a number of by-reference maps to transforms,
// PCollections, SDK environments, coders, etc., for
// supporting compact reuse and arbitrary graph structure.
message Pipeline {
// (Required) The coders, UDFs, graph nodes, etc, that make up
// this pipeline.
Components components = 1;
// (Required) The ids of all PTransforms that are not contained within another
// PTransform. These must be in shallow topological order, so that traversing
// them recursively in this order yields a recursively topological traversal.
repeated string root_transform_ids = 2;
// (Optional) Static display data for the pipeline. If there is none,
// it may be omitted.
repeated DisplayData display_data = 3;
// (Optional) A set of requirements that the runner MUST understand and be
// able to faithfully provide in order to execute this pipeline. These
// may indicate that a runner must inspect new fields on a component or
// provide additional guarantees when processing specific transforms.
// A runner should reject any pipelines with unknown requirements.
repeated string requirements = 4;
}
// Transforms are the operations in your pipeline, and provide a generic
// processing framework. You provide processing logic in the form of a function
// object (colloquially referred to as “user code”), and your user code is
// applied to each element of an input PCollection (or more than one
// PCollection). Depending on the pipeline runner and back-end that you choose,
// many different workers across a cluster may execute instances of your user
// code in parallel. The user code running on each worker generates the output
// elements that are ultimately added to the final output PCollection that the
// transform produces.
//
// The Beam SDKs contain a number of different transforms that you can apply to
// your pipeline’s PCollections. These include general-purpose core transforms,
// such as ParDo or Combine. There are also pre-written composite transforms
// included in the SDKs, which combine one or more of the core transforms in a
// useful processing pattern, such as counting or combining elements in a
// collection. You can also define your own more complex composite transforms to
// fit your pipeline’s exact use case.
message PTransform {
// (Required) A unique name for the application node.
//
// Ideally, this should be stable over multiple evolutions of a pipeline
// for the purposes of logging and associating pipeline state with a node,
// etc.
//
// If it is not stable, then the runner decides what will happen. But, most
// importantly, it must always be here and be unique, even if it is
// autogenerated.
string unique_name = 5;
// (Optional) A URN and payload that, together, fully defined the semantics
// of this transform.
//
// If absent, this must be an "anonymous" composite transform.
//
// For primitive transform in the Runner API, this is required, and the
// payloads are well-defined messages. When the URN indicates ParDo it
// is a ParDoPayload, and so on. For some special composite transforms,
// the payload is also officially defined. See StandardPTransforms for
// details.
FunctionSpec spec = 1;
// (Optional) A list of the ids of transforms that it contains.
//
// Primitive transforms are not allowed to specify this.
repeated string subtransforms = 2;
// (Required) A map from local names of inputs (unique only with this map, and
// likely embedded in the transform payload and serialized user code) to
// PCollection ids.
//
// The payload for this transform may clarify the relationship of these
// inputs. For example:
//
// - for a Flatten transform they are merged
// - for a ParDo transform, some may be side inputs
//
// All inputs are recorded here so that the topological ordering of
// the graph is consistent whether or not the payload is understood.
map<string, string> inputs = 3;
// (Required) A map from local names of outputs (unique only within this map,
// and likely embedded in the transform payload and serialized user code)
// to PCollection ids.
//
// The URN or payload for this transform node may clarify the type and
// relationship of these outputs. For example:
//
// - for a ParDo transform, these are tags on PCollections, which will be
// embedded in the DoFn.
map<string, string> outputs = 4;
// (Optional) Static display data for this PTransform application. If
// there is none, it may be omitted.
repeated DisplayData display_data = 6;
// Environment where the current PTransform should be executed in.
//
// Transforms that are required to be implemented by a runner must omit this.
// All other transforms are required to specify this.
string environment_id = 7;
}
message StandardPTransforms {
// Primitive transforms may not specify composite sub-transforms.
enum Primitives {
// ParDo is a Beam transform for generic parallel processing. The ParDo
// processing paradigm is similar to the “Map” phase of a
// Map/Shuffle/Reduce-style algorithm: a ParDo transform considers each
// element in the input PCollection, performs some processing function
// (your user code) on that element, and emits zero, one, or multiple
// elements to an output PCollection.
//
// See https://beam.apache.org/documentation/programming-guide/#pardo
// for additional details.
//
// Payload: ParDoPayload
PAR_DO = 0 [(beam_urn) = "beam:transform:pardo:v1"];
// Flatten is a Beam transform for PCollection objects that store the same
// data type. Flatten merges multiple PCollection objects into a single
// logical PCollection.
//
// See https://beam.apache.org/documentation/programming-guide/#flatten
// for additional details.
//
// Payload: None
FLATTEN = 1 [(beam_urn) = "beam:transform:flatten:v1"];
// GroupByKey is a Beam transform for processing collections of key/value
// pairs. It’s a parallel reduction operation, analogous to the Shuffle
// phase of a Map/Shuffle/Reduce-style algorithm. The input to GroupByKey is
// a collection of key/value pairs that represents a multimap, where the
// collection contains multiple pairs that have the same key, but different
// values. Given such a collection, you use GroupByKey to collect all of the
// values associated with each unique key.
//
// See https://beam.apache.org/documentation/programming-guide/#groupbykey
// for additional details.
//
// Never defines an environment as the runner is required to implement this
// transform.
//
// Payload: None
GROUP_BY_KEY = 2 [(beam_urn) = "beam:transform:group_by_key:v1"];
// A transform which produces a single empty byte array at the minimum
// timestamp in the GlobalWindow.
//
// Never defines an environment as the runner is required to implement this
// transform.
//
// Payload: None
IMPULSE = 3 [(beam_urn) = "beam:transform:impulse:v1"];
// Windowing subdivides a PCollection according to the timestamps of its
// individual elements. Transforms that aggregate multiple elements, such as
// GroupByKey and Combine, work implicitly on a per-window basis — they
// process each PCollection as a succession of multiple, finite windows,
// though the entire collection itself may be of unbounded size.
//
// See https://beam.apache.org/documentation/programming-guide/#windowing
// for additional details.
//
// Payload: WindowIntoPayload
ASSIGN_WINDOWS = 4 [(beam_urn) = "beam:transform:window_into:v1"];
// A testing input that generates an unbounded {@link PCollection} of
// elements, advancing the watermark and processing time as elements are
// emitted. After all of the specified elements are emitted, ceases to
// produce output.
//
// See https://beam.apache.org/blog/2016/10/20/test-stream.html
// for additional details.
//
// Payload: TestStreamPayload
TEST_STREAM = 5 [(beam_urn) = "beam:transform:teststream:v1"];
// Represents mapping of main input window onto side input window.
//
// Side input window mapping function:
// Input: KV<nonce, MainInputWindow>
// Output: KV<nonce, SideInputWindow>
//
// For each main input window, the side input window is returned. The
// nonce is used by a runner to associate each input with its output.
// The nonce is represented as an opaque set of bytes.
//
// Payload: SideInput#window_mapping_fn FunctionSpec
MAP_WINDOWS = 6 [(beam_urn) = "beam:transform:map_windows:v1"];
// Used to merge windows during a GroupByKey.
//
// Window merging function:
// Input: KV<nonce, iterable<OriginalWindow>>
// Output: KV<nonce, KV<iterable<UnmergedOriginalWindow>, iterable<KV<MergedWindow, iterable<ConsumedOriginalWindow>>>>
//
// For each set of original windows, a list of all unmerged windows is
// output alongside a map of merged window to set of consumed windows.
// All original windows must be contained in either the unmerged original
// window set or one of the consumed original window sets. Each original
// window can only be part of one output set. The nonce is used by a runner
// to associate each input with its output. The nonce is represented as an
// opaque set of bytes.
//
// Payload: WindowingStrategy#window_fn FunctionSpec
MERGE_WINDOWS = 7 [(beam_urn) = "beam:transform:merge_windows:v1"];
}
enum DeprecatedPrimitives {
// Represents the operation to read a Bounded or Unbounded source.
// Payload: ReadPayload.
READ = 0 [(beam_urn) = "beam:transform:read:v1"];
// Runners should move away from translating `CreatePCollectionView` and treat this as
// part of the translation for a `ParDo` side input.
CREATE_VIEW = 1 [(beam_urn) = "beam:transform:create_view:v1"];
}
enum Composites {
// Represents the Combine.perKey() operation.
// If this is produced by an SDK, it is assumed that the SDK understands
// each of CombineComponents.
// Payload: CombinePayload
COMBINE_PER_KEY = 0 [(beam_urn) = "beam:transform:combine_per_key:v1"];
// Represents the Combine.globally() operation.
// If this is produced by an SDK, it is assumed that the SDK understands
// each of CombineComponents.
// Payload: CombinePayload
COMBINE_GLOBALLY = 1 [(beam_urn) = "beam:transform:combine_globally:v1"];
// Represents the Reshuffle operation.
RESHUFFLE = 2 [(beam_urn) = "beam:transform:reshuffle:v1"];
// Less well-known. Payload: WriteFilesPayload.
WRITE_FILES = 3 [(beam_urn) = "beam:transform:write_files:v1"];
}
// Payload for all of these: CombinePayload
enum CombineComponents {
// Represents the Pre-Combine part of a lifted Combine Per Key, as described
// in the following document:
// https://s.apache.org/beam-runner-api-combine-model#heading=h.ta0g6ase8z07
// Payload: CombinePayload
COMBINE_PER_KEY_PRECOMBINE = 0 [(beam_urn) = "beam:transform:combine_per_key_precombine:v1"];
// Represents the Merge Accumulators part of a lifted Combine Per Key, as
// described in the following document:
// https://s.apache.org/beam-runner-api-combine-model#heading=h.jco9rvatld5m
// Payload: CombinePayload
COMBINE_PER_KEY_MERGE_ACCUMULATORS = 1 [(beam_urn) = "beam:transform:combine_per_key_merge_accumulators:v1"];
// Represents the Extract Outputs part of a lifted Combine Per Key, as
// described in the following document:
// https://s.apache.org/beam-runner-api-combine-model#heading=h.i9i6p8gtl6ku
// Payload: CombinePayload
COMBINE_PER_KEY_EXTRACT_OUTPUTS = 2 [(beam_urn) = "beam:transform:combine_per_key_extract_outputs:v1"];
// Represents the Combine Grouped Values transform, as described in the
// following document:
// https://s.apache.org/beam-runner-api-combine-model#heading=h.aj86ew4v1wk
// Payload: CombinePayload
COMBINE_GROUPED_VALUES = 3 [(beam_urn) = "beam:transform:combine_grouped_values:v1"];
// Represents the Convert To Accumulators transform, as described in the
// following document:
// https://s.apache.org/beam-runner-api-combine-model#heading=h.h5697l1scd9x
// Payload: CombinePayload
COMBINE_PER_KEY_CONVERT_TO_ACCUMULATORS = 4
[(beam_urn) =
"beam:transform:combine_per_key_convert_to_accumulators:v1"];
}
// Payload for all of these: ParDoPayload containing the user's SDF
enum SplittableParDoComponents {
// Pairs the input element with its initial restriction.
// Input: element; output: KV(element, restriction).
PAIR_WITH_RESTRICTION = 0 [(beam_urn) = "beam:transform:sdf_pair_with_restriction:v1"];
// Splits the restriction of each element/restriction pair and returns the
// resulting splits, with a corresponding floating point size estimation
// for each.
//
// A reasonable value for size is the number of bytes expected to be
// produced by this (element, restriction) pair.
//
// Input: KV(element, restriction)
// Output: KV(KV(element, restriction), size))
SPLIT_AND_SIZE_RESTRICTIONS = 1 [(beam_urn) = "beam:transform:sdf_split_and_size_restrictions:v1"];
// Applies the DoFn to every element and restriction.
//
// All primary and residuals returned from checkpointing or splitting must
// have the same type as the input to this transform.
//
// Input: KV(KV(element, restriction), size); output: DoFn's output.
PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS = 2 [(beam_urn) = "beam:transform:sdf_process_sized_element_and_restrictions:v1"];
// Truncates the restriction of each element/restriction pair and returns
// the finite restriction which will be processed when a pipeline is
// drained. See
// https://docs.google.com/document/d/1NExwHlj-2q2WUGhSO4jTu8XGhDPmm3cllSN8IMmWci8/edit#.
// for additional details about drain.
//
// Input: KV(KV(element, restriction), size);
// Output: KV(KV(element, restriction), size).
TRUNCATE_SIZED_RESTRICTION = 3 [(beam_urn) = "beam:transform:sdf_truncate_sized_restrictions:v1"];
}
}
message StandardSideInputTypes {
enum Enum {
// Represents a view over a PCollection<V>.
//
// StateGetRequests performed on this side input must use
// StateKey.IterableSideInput.
ITERABLE = 0 [(beam_urn) = "beam:side_input:iterable:v1"];
// Represents a view over a PCollection<KV<K, V>>.
//
// StateGetRequests performed on this side input must use
// StateKey.IterableSideInput or StateKey.MultimapSideInput.
MULTIMAP = 1 [(beam_urn) = "beam:side_input:multimap:v1"];
}
}
// A PCollection!
message PCollection {
// (Required) A unique name for the PCollection.
//
// Ideally, this should be stable over multiple evolutions of a pipeline
// for the purposes of logging and associating pipeline state with a node,
// etc.
//
// If it is not stable, then the runner decides what will happen. But, most
// importantly, it must always be here, even if it is autogenerated.
string unique_name = 1;
// (Required) The id of the Coder for this PCollection.
string coder_id = 2;
// (Required) Whether this PCollection is bounded or unbounded
IsBounded.Enum is_bounded = 3;
// (Required) The id of the windowing strategy for this PCollection.
string windowing_strategy_id = 4;
// (Optional) Static display data for the PCollection. If there is none,
// it may be omitted.
repeated DisplayData display_data = 5;
}
// The payload for the primitive ParDo transform.
message ParDoPayload {
// (Required) The FunctionSpec of the DoFn.
FunctionSpec do_fn = 1;
// (Optional) A mapping of local input names to side inputs, describing
// the expected access pattern.
map<string, SideInput> side_inputs = 3;
// (Optional) A mapping of local state names to state specifications.
// If this is set, the stateful processing requirement should also
// be placed in the pipeline requirements.
map<string, StateSpec> state_specs = 4;
// (Optional) A mapping of local timer family names to timer family
// specifications. If this is set, the stateful processing requirement should
// also be placed in the pipeline requirements.
map<string, TimerFamilySpec> timer_family_specs = 9;
// (Optional) Only set when this ParDo contains a splittable DoFn.
// If this is set, the corresponding standard requirement should also
// be placed in the pipeline requirements.
string restriction_coder_id = 7;
// (Optional) Only set when this ParDo can request bundle finalization.
// If this is set, the corresponding standard requirement should also
// be placed in the pipeline requirements.
bool requests_finalization = 8;
// Whether this stage requires time sorted input.
// If this is set, the corresponding standard requirement should also
// be placed in the pipeline requirements.
bool requires_time_sorted_input = 10;
// Whether this stage requires stable input.
// If this is set, the corresponding standard requirement should also
// be placed in the pipeline requirements.
bool requires_stable_input = 11;
reserved 6;
}
message StateSpec {
oneof spec {
ReadModifyWriteStateSpec read_modify_write_spec = 1;
BagStateSpec bag_spec = 2;
CombiningStateSpec combining_spec = 3;
MapStateSpec map_spec = 4;
SetStateSpec set_spec = 5;
OrderedListStateSpec ordered_list_spec = 6;
}
}
message ReadModifyWriteStateSpec {
string coder_id = 1;
}
message BagStateSpec {
string element_coder_id = 1;
}
message OrderedListStateSpec {
string element_coder_id = 1;
}
message CombiningStateSpec {
string accumulator_coder_id = 1;
FunctionSpec combine_fn = 2;
}
message MapStateSpec {
string key_coder_id = 1;
string value_coder_id = 2;
}
message SetStateSpec {
string element_coder_id = 1;
}
message TimerFamilySpec {
TimeDomain.Enum time_domain = 1;
string timer_family_coder_id = 2;
}
message IsBounded {
enum Enum {
UNSPECIFIED = 0;
UNBOUNDED = 1;
BOUNDED = 2;
}
}
// The payload for the primitive Read transform.
message ReadPayload {
// (Required) The FunctionSpec of the source for this Read.
FunctionSpec source = 1;
// (Required) Whether the source is bounded or unbounded
IsBounded.Enum is_bounded = 2;
// TODO: full audit of fields required by runners as opposed to SDK harness
}
// The payload for the WindowInto transform.
message WindowIntoPayload {
// (Required) The FunctionSpec of the WindowFn.
FunctionSpec window_fn = 1;
}
// The payload for the special-but-not-primitive Combine transform.
message CombinePayload {
// (Required) The FunctionSpec of the CombineFn.
FunctionSpec combine_fn = 1;
// (Required) A reference to the Coder to use for accumulators of the CombineFn
string accumulator_coder_id = 2;
}
// The payload for the test-only primitive TestStream
message TestStreamPayload {
// (Required) the coder for elements in the TestStream events
string coder_id = 1;
// (Optional) If specified, the TestStream will replay these events.
repeated Event events = 2;
// (Optional) If specified, points to a TestStreamService to be
// used to retrieve events.
ApiServiceDescriptor endpoint = 3;
message Event {
oneof event {
AdvanceWatermark watermark_event = 1;
AdvanceProcessingTime processing_time_event = 2;
AddElements element_event = 3;
}
// Advances the watermark to the specified timestamp.
message AdvanceWatermark {
// (Required) The watermark to advance to.
int64 new_watermark = 1;
// (Optional) The output watermark tag for a PCollection. If unspecified
// or with an empty string, this will default to the Main PCollection
// Output
string tag = 2;
}
// Advances the processing time clock by the specified amount.
message AdvanceProcessingTime {
// (Required) The duration to advance by.
int64 advance_duration = 1;
}
// Adds elements to the stream to be emitted.
message AddElements {
// (Required) The elements to add to the TestStream.
repeated TimestampedElement elements = 1;
// (Optional) The output PCollection tag to add these elements to. If
// unspecified or with an empty string, this will default to the Main
// PCollection Output.
string tag = 3;
}
}
// A single element inside of the TestStream.
message TimestampedElement {
// (Required) The element encoded. Currently the TestStream only supports
// encoding primitives.
bytes encoded_element = 1;
// (Required) The event timestamp of this element.
int64 timestamp = 2;
}
}
service TestStreamService {
// A TestStream will request for events using this RPC.
rpc Events(EventsRequest) returns (stream TestStreamPayload.Event) {}
}
message EventsRequest {
// The set of PCollections to read from. These are the PTransform outputs
// local names. These are a subset of the TestStream's outputs. This allows
// Interactive Beam to cache many PCollections from a pipeline then replay a
// subset of them.
repeated string output_ids = 1;
}
// The payload for the special-but-not-primitive WriteFiles transform.
message WriteFilesPayload {
// (Required) The FunctionSpec of the FileBasedSink.
FunctionSpec sink = 1;
// (Required) The format function.
FunctionSpec format_function = 2;
bool windowed_writes = 3;
bool runner_determined_sharding = 4;
map<string, SideInput> side_inputs = 5;
}
// A coder, the binary format for serialization and deserialization of data in
// a pipeline.
message Coder {
// (Required) A specification for the coder, as a URN plus parameters. This
// may be a cross-language agreed-upon format, or it may be a "custom coder"
// that can only be used by a particular SDK. It does not include component
// coders, as it is beneficial for these to be comprehensible to a runner
// regardless of whether the binary format is agreed-upon.
FunctionSpec spec = 1;
// (Optional) If this coder is parametric, such as ListCoder(VarIntCoder),
// this is a list of the components. In order for encodings to be identical,
// the FunctionSpec and all components must be identical, recursively.
repeated string component_coder_ids = 2;
}
message StandardCoders {
enum Enum {
// Components: None
BYTES = 0 [(beam_urn) = "beam:coder:bytes:v1"];
// Components: None
STRING_UTF8 = 10 [(beam_urn) = "beam:coder:string_utf8:v1"];
// Components: The key and value coder, in that order.
KV = 1 [(beam_urn) = "beam:coder:kv:v1"];
// Components: None
BOOL = 12 [(beam_urn) = "beam:coder:bool:v1"];
// Variable length Encodes a 64-bit integer.
// Components: None
VARINT = 2 [(beam_urn) = "beam:coder:varint:v1"];
// Encodes the floating point value as a big-endian 64-bit integer
// according to the IEEE 754 double format bit layout.
// Components: None
DOUBLE = 11 [(beam_urn) = "beam:coder:double:v1"];
// Encodes an iterable of elements.
//
// The encoding for an iterable [e1...eN] of known length N is
//
// fixed32(N)
// encode(e1) encode(e2) encode(e3) ... encode(eN)
//
// If the length is unknown, it is batched up into groups of size b1..bM
// and encoded as
//
// fixed32(-1)
// varInt64(b1) encode(e1) encode(e2) ... encode(e_b1)
// varInt64(b2) encode(e_(b1+1)) encode(e_(b1+2)) ... encode(e_(b1+b2))
// ...
// varInt64(bM) encode(e_(N-bM+1)) encode(e_(N-bM+2)) ... encode(eN)
// varInt64(0)
//
// Components: Coder for a single element.
ITERABLE = 3 [(beam_urn) = "beam:coder:iterable:v1"];
// Encodes a timer containing a user key, a dynamic timer tag, a clear bit,
// a fire timestamp, a hold timestamp, the windows and the paneinfo.
// The encoding is represented as:
// user key - user defined key, uses the component coder.
// dynamic timer tag - a string which identifies a timer.
// windows - uses component coders.
// clear bit - a boolean set for clearing the timer.
// fire timestamp - a big endian 8 byte integer representing millis-since-epoch.
// The encoded representation is shifted so that the byte representation of
// negative values are lexicographically ordered before the byte representation
// of positive values. This is typically done by subtracting -9223372036854775808
// from the value and encoding it as a signed big endian integer. Example values:
//
// -9223372036854775808: 00 00 00 00 00 00 00 00
// -255: 7F FF FF FF FF FF FF 01
// -1: 7F FF FF FF FF FF FF FF
// 0: 80 00 00 00 00 00 00 00
// 1: 80 00 00 00 00 00 00 01
// 256: 80 00 00 00 00 00 01 00
// 9223372036854775807: FF FF FF FF FF FF FF FF
// hold timestamp - similar to the fire timestamp.
// paneinfo - similar to the paneinfo of the windowed_value.
// Components: Coder for the key and windows.
TIMER = 4 [(beam_urn) = "beam:coder:timer:v1"];
/*
* The following coders are typically not specified manually by the user,
* but are used at runtime and must be supported by every SDK.
*/
// Components: None
INTERVAL_WINDOW = 5 [(beam_urn) = "beam:coder:interval_window:v1"];
// Components: The coder to attach a length prefix to
LENGTH_PREFIX = 6 [(beam_urn) = "beam:coder:length_prefix:v1"];
// Components: None
GLOBAL_WINDOW = 7 [(beam_urn) = "beam:coder:global_window:v1"];
// Encodes an element, the windows it is in, the timestamp of the element,
// and the pane of the element. The encoding is represented as:
// timestamp windows pane element
// timestamp - A big endian 8 byte integer representing millis-since-epoch.
// The encoded representation is shifted so that the byte representation
// of negative values are lexicographically ordered before the byte
// representation of positive values. This is typically done by
// subtracting -9223372036854775808 from the value and encoding it as a
// signed big endian integer. Example values:
//
// -9223372036854775808: 00 00 00 00 00 00 00 00
// -255: 7F FF FF FF FF FF FF 01
// -1: 7F FF FF FF FF FF FF FF
// 0: 80 00 00 00 00 00 00 00
// 1: 80 00 00 00 00 00 00 01
// 256: 80 00 00 00 00 00 01 00
// 9223372036854775807: FF FF FF FF FF FF FF FF
//
// windows - The windows are encoded using the beam:coder:iterable:v1
// format, where the windows are encoded using the supplied window
// coder.
//
// pane - The first byte of the pane info determines which type of
// encoding is used, as well as the is_first, is_last, and timing
// fields. If this byte is bits [0 1 2 3 4 5 6 7], then:
// * bits [0 1 2 3] determine the encoding as follows:
// 0000 - The entire pane info is encoded as a single byte.
// The is_first, is_last, and timing fields are encoded
// as below, and the index and non-speculative index are
// both zero (and hence are not encoded here).
// 0001 - The pane info is encoded as this byte plus a single
// VarInt encoed integer representing the pane index. The
// non-speculative index can be derived as follows:
// -1 if the pane is early, otherwise equal to index.
// 0010 - The pane info is encoded as this byte plus two VarInt
// encoded integers representing the pane index and
// non-speculative index respectively.
// * bits [4 5] encode the timing as follows:
// 00 - early
// 01 - on time
// 10 - late
// 11 - unknown
// * bit 6 is 1 if this is the first pane, 0 otherwise.
// * bit 7 is 1 if this is the last pane, 0 otherwise.
//
// element - The element incoded using the supplied element coder.
//
// Components: The element coder and the window coder, in that order.
WINDOWED_VALUE = 8 [(beam_urn) = "beam:coder:windowed_value:v1"];
// A windowed value coder with parameterized timestamp, windows and pane info.
// Encodes an element with only the value of the windowed value.
// Decodes the value and assigns the parameterized timestamp, windows and pane info to the
// windowed value.
// Components: The element coder and the window coder, in that order
// The payload of this coder is an encoded windowed value using the
// beam:coder:windowed_value:v1 coder parameterized by a beam:coder:bytes:v1
// element coder and the window coder that this param_windowed_value coder uses.
PARAM_WINDOWED_VALUE = 14 [(beam_urn) = "beam:coder:param_windowed_value:v1"];
// Encodes an iterable of elements, some of which may be stored elsewhere.
//
// The encoding for a state-backed iterable is the same as that for
// an iterable, but the final varInt64(0) terminating the set of batches
// may instead be replaced by
//
// varInt64(-1)
// varInt64(len(token))
// token
//
// where token is an opaque byte string that can be used to fetch the
// remainder of the iterable (e.g. over the state API).
//
// Components: Coder for a single element.
// Experimental.
STATE_BACKED_ITERABLE = 9 [(beam_urn) = "beam:coder:state_backed_iterable:v1"];
// Additional Standard Coders
// --------------------------
// The following coders are not required to be implemented for an SDK or
// runner to support the Beam model, but enable users to take advantage of
// schema-aware transforms.
// Encodes a "row", an element with a known schema, defined by an
// instance of Schema from schema.proto.
//
// A row is encoded as the concatenation of:
// - The number of attributes in the schema, encoded with
// beam:coder:varint:v1. This makes it possible to detect certain
// allowed schema changes (appending or removing columns) in
// long-running streaming pipelines.
// - A byte array representing a packed bitset indicating null fields (a
// 1 indicating a null) encoded with beam:coder:bytes:v1. The unused
// bits in the last byte must be set to 0. If there are no nulls an
// empty byte array is encoded.
// The two-byte bitset (not including the lenghth-prefix) for the row
// [NULL, 0, 0, 0, NULL, 0, 0, NULL, 0, NULL] would be
// [0b10010001, 0b00000010]
// - An encoding for each non-null field, concatenated together.
//
// Schema types are mapped to coders as follows:
// AtomicType:
// BYTE: not yet a standard coder (BEAM-7996)
// INT16: not yet a standard coder (BEAM-7996)
// INT32: beam:coder:varint:v1
// INT64: beam:coder:varint:v1
// FLOAT: not yet a standard coder (BEAM-7996)
// DOUBLE: beam:coder:double:v1
// STRING: beam:coder:string_utf8:v1
// BOOLEAN: beam:coder:bool:v1
// BYTES: beam:coder:bytes:v1
// ArrayType: beam:coder:iterable:v1 (always has a known length)
// MapType: not a standard coder, specification defined below.
// RowType: beam:coder:row:v1
// LogicalType: Uses the coder for its representation.
//
// The MapType is encoded by:
// - An INT32 representing the size of the map (N)
// - Followed by N interleaved keys and values, encoded with their
// corresponding coder.
//
// Nullable types in container types (ArrayType, MapType) are encoded by:
// - A one byte null indicator, 0x00 for null values, or 0x01 for present
// values.
// - For present values the null indicator is followed by the value
// encoded with it's corresponding coder.
//
// The payload for RowCoder is an instance of Schema.
// Components: None
// Experimental.
ROW = 13 [(beam_urn) = "beam:coder:row:v1"];
}
}
// A windowing strategy describes the window function, triggering, allowed
// lateness, and accumulation mode for a PCollection.
//
// TODO: consider inlining field on PCollection
message WindowingStrategy {
// (Required) The FunctionSpec of the UDF that assigns windows,
// merges windows, and shifts timestamps before they are
// combined according to the OutputTime.
FunctionSpec window_fn = 1;
// (Required) Whether or not the window fn is merging.
//
// This knowledge is required for many optimizations.
MergeStatus.Enum merge_status = 2;
// (Required) The coder for the windows of this PCollection.
string window_coder_id = 3;
// (Required) The trigger to use when grouping this PCollection.
Trigger trigger = 4;
// (Required) The accumulation mode indicates whether new panes are a full
// replacement for prior panes or whether they are deltas to be combined
// with other panes (the combine should correspond to whatever the upstream
// grouping transform is).
AccumulationMode.Enum accumulation_mode = 5;
// (Required) The OutputTime specifies, for a grouping transform, how to
// compute the aggregate timestamp. The window_fn will first possibly shift
// it later, then the OutputTime takes the max, min, or ignores it and takes
// the end of window.
//
// This is actually only for input to grouping transforms, but since they
// may be introduced in runner-specific ways, it is carried along with the
// windowing strategy.
OutputTime.Enum output_time = 6;
// (Required) Indicate when output should be omitted upon window expiration.
ClosingBehavior.Enum closing_behavior = 7;
// (Required) The duration, in milliseconds, beyond the end of a window at
// which the window becomes droppable.
int64 allowed_lateness = 8;
// (Required) Indicate whether empty on-time panes should be omitted.
OnTimeBehavior.Enum OnTimeBehavior = 9;
// (Required) Whether or not the window fn assigns inputs to exactly one window
//
// This knowledge is required for some optimizations
bool assigns_to_one_window = 10;
// (Optional) Environment where the current window_fn should be applied in.
// Runner that executes the pipeline may choose to override this if needed.
// If not specified, environment will be decided by the runner.
string environment_id = 11;
}
// Whether or not a PCollection's WindowFn is non-merging, merging, or
// merging-but-already-merged, in which case a subsequent GroupByKey is almost
// always going to do something the user does not want
message MergeStatus {
enum Enum {
UNSPECIFIED = 0;
// The WindowFn does not require merging.
// Examples: global window, FixedWindows, SlidingWindows
NON_MERGING = 1;
// The WindowFn is merging and the PCollection has not had merging
// performed.
// Example: Sessions prior to a GroupByKey
NEEDS_MERGE = 2;
// The WindowFn is merging and the PCollection has had merging occur
// already.
// Example: Sessions after a GroupByKey
ALREADY_MERGED = 3;
}
}
// Whether or not subsequent outputs of aggregations should be entire
// replacement values or just the aggregation of inputs received since
// the prior output.
message AccumulationMode {
enum Enum {
UNSPECIFIED = 0;
// The aggregation is discarded when it is output
DISCARDING = 1;
// The aggregation is accumulated across outputs
ACCUMULATING = 2;
// The aggregation emits retractions when it is output
RETRACTING = 3;
}
}
// Controls whether or not an aggregating transform should output data
// when a window expires.
message ClosingBehavior {
enum Enum {
UNSPECIFIED = 0;
// Emit output when a window expires, whether or not there has been
// any new data since the last output.
EMIT_ALWAYS = 1;
// Only emit output when new data has arrives since the last output
EMIT_IF_NONEMPTY = 2;
}
}
// Controls whether or not an aggregating transform should output data
// when an on-time pane is empty.
message OnTimeBehavior {
enum Enum {
UNSPECIFIED = 0;
// Always fire the on-time pane. Even if there is no new data since
// the previous firing, an element will be produced.
FIRE_ALWAYS = 1;
// Only fire the on-time pane if there is new data since the previous firing.
FIRE_IF_NONEMPTY = 2;
}
}
// When a number of windowed, timestamped inputs are aggregated, the timestamp
// for the resulting output.
message OutputTime {
enum Enum {
UNSPECIFIED = 0;
// The output has the timestamp of the end of the window.
END_OF_WINDOW = 1;
// The output has the latest timestamp of the input elements since
// the last output.
LATEST_IN_PANE = 2;
// The output has the earliest timestamp of the input elements since
// the last output.
EARLIEST_IN_PANE = 3;
}
}
// The different time domains in the Beam model.
message TimeDomain {
enum Enum {
UNSPECIFIED = 0;
// Event time is time from the perspective of the data
EVENT_TIME = 1;
// Processing time is time from the perspective of the
// execution of your pipeline
PROCESSING_TIME = 2;
// Synchronized processing time is the minimum of the
// processing time of all pending elements.
//
// The "processing time" of an element refers to
// the local processing time at which it was emitted
SYNCHRONIZED_PROCESSING_TIME = 3;
}
}
// A small DSL for expressing when to emit new aggregations
// from a GroupByKey or CombinePerKey
//
// A trigger is described in terms of when it is _ready_ to permit output.
message Trigger {
// Ready when all subtriggers are ready.
message AfterAll {
repeated Trigger subtriggers = 1;
}
// Ready when any subtrigger is ready.
message AfterAny {
repeated Trigger subtriggers = 1;
}
// Starting with the first subtrigger, ready when the _current_ subtrigger
// is ready. After output, advances the current trigger by one.
message AfterEach {
repeated Trigger subtriggers = 1;
}
// Ready after the input watermark is past the end of the window.
//
// May have implicitly-repeated subtriggers for early and late firings.
// When the end of the window is reached, the trigger transitions between
// the subtriggers.
message AfterEndOfWindow {
// (Optional) A trigger governing output prior to the end of the window.
Trigger early_firings = 1;
// (Optional) A trigger governing output after the end of the window.
Trigger late_firings = 2;
}
// After input arrives, ready when the specified delay has passed.
message AfterProcessingTime {
// (Required) The transforms to apply to an arriving element's timestamp,
// in order
repeated TimestampTransform timestamp_transforms = 1;
}
// Ready whenever upstream processing time has all caught up with
// the arrival time of an input element
message AfterSynchronizedProcessingTime {
}
// The default trigger. Equivalent to Repeat { AfterEndOfWindow } but
// specially denoted to indicate the user did not alter the triggering.
message Default {
}
// Ready whenever the requisite number of input elements have arrived
message ElementCount {
int32 element_count = 1;
}
// Never ready. There will only be an ON_TIME output and a final
// output at window expiration.
message Never {
}
// Always ready. This can also be expressed as ElementCount(1) but
// is more explicit.
message Always {
}
// Ready whenever either of its subtriggers are ready, but finishes output
// when the finally subtrigger fires.
message OrFinally {
// (Required) Trigger governing main output; may fire repeatedly.
Trigger main = 1;
// (Required) Trigger governing termination of output.
Trigger finally = 2;
}
// Ready whenever the subtrigger is ready; resets state when the subtrigger
// completes.
message Repeat {
// (Require) Trigger that is run repeatedly.
Trigger subtrigger = 1;
}
// The full disjoint union of possible triggers.
oneof trigger {
AfterAll after_all = 1;
AfterAny after_any = 2;
AfterEach after_each = 3;
AfterEndOfWindow after_end_of_window = 4;
AfterProcessingTime after_processing_time = 5;
AfterSynchronizedProcessingTime after_synchronized_processing_time = 6;
Always always = 12;
Default default = 7;
ElementCount element_count = 8;
Never never = 9;
OrFinally or_finally = 10;
Repeat repeat = 11;
}
}
// A specification for a transformation on a timestamp.
//
// Primarily used by AfterProcessingTime triggers to transform
// the arrival time of input to a target time for firing.
message TimestampTransform {
oneof timestamp_transform {
Delay delay = 1;
AlignTo align_to = 2;
}
message Delay {
// (Required) The delay, in milliseconds.
int64 delay_millis = 1;
}
message AlignTo {
// (Required) A duration to which delays should be quantized
// in milliseconds.
int64 period = 3;
// (Required) An offset from 0 for the quantization specified by
// alignment_size, in milliseconds
int64 offset = 4;
}
}
// A specification for how to "side input" a PCollection.
message SideInput {
// (Required) URN of the access pattern required by the `view_fn` to present
// the desired SDK-specific interface to a UDF.
//
// This access pattern defines the SDK harness <-> Runner Harness RPC
// interface for accessing a side input.
//
// The only access pattern intended for Beam, because of its superior
// performance possibilities, is "beam:sideinput:multimap" (or some such
// URN)
FunctionSpec access_pattern = 1;
// (Required) The FunctionSpec of the UDF that adapts a particular
// access_pattern to a user-facing view type.
//
// For example, View.asSingleton() may include a `view_fn` that adapts a
// specially-designed multimap to a single value per window.
FunctionSpec view_fn = 2;
// (Required) The FunctionSpec of the UDF that maps a main input window
// to a side input window.
//
// For example, when the main input is in fixed windows of one hour, this
// can specify that the side input should be accessed according to the day
// in which that hour falls.
FunctionSpec window_mapping_fn = 3;
}
message StandardArtifacts {
enum Types {
// A URN for locally-accessible artifact files.
// payload: ArtifactFilePayload
FILE = 0 [(beam_urn) = "beam:artifact:type:file:v1"];
// A URN for artifacts described by URLs.
// payload: ArtifactUrlPayload
URL = 1 [(beam_urn) = "beam:artifact:type:url:v1"];
// A URN for artifacts embedded in ArtifactInformation proto.
// payload: EmbeddedFilePayload.
EMBEDDED = 2 [(beam_urn) = "beam:artifact:type:embedded:v1"];
// A URN for Python artifacts hosted on PYPI.
// payload: PypiPayload
PYPI = 3 [(beam_urn) = "beam:artifact:type:pypi:v1"];
// A URN for Java artifacts hosted on a Maven repository.
// payload: MavenPayload
MAVEN = 4 [(beam_urn) = "beam:artifact:type:maven:v1"];
// A URN for deferred artifacts.
// payload: DeferredArtifactPayload
DEFERRED = 5 [(beam_urn) = "beam:artifact:type:deferred:v1"];
}
enum Roles {
// A URN for staging-to role.
// payload: ArtifactStagingToRolePayload
STAGING_TO = 0 [(beam_urn) = "beam:artifact:role:staging_to:v1"];
}
}
message ArtifactFilePayload {
// a string for an artifact file path e.g. "/tmp/foo.jar"
string path = 1;
// The hex-encoded sha256 checksum of the artifact.
string sha256 = 2;
}
message ArtifactUrlPayload {
// a string for an artifact URL e.g. "https://.../foo.jar" or "gs://tmp/foo.jar"
string url = 1;
}
message EmbeddedFilePayload {
// raw data bytes for an embedded artifact
bytes data = 1;
}
message PyPIPayload {
// Pypi compatible artifact id e.g. "apache-beam"
string artifact_id = 1;
// Pypi compatible version string.
string version = 2;
}
message MavenPayload {
// A string specifying Maven artifact.
// The standard format is "groupId:artifactId:version[:packaging[:classifier]]"
string artifact = 1;
// (Optional) Repository URL. If not specified, Maven central is used by default.
string repository_url = 2;
}
message DeferredArtifactPayload {
// A unique string identifier assigned by the creator of this payload. The creator may use this key to confirm
// whether they can parse the data.
string key = 1;
// Data for deferred artifacts. Interpretation of bytes is delegated to the creator of this payload.
bytes data = 2;
}
message ArtifactStagingToRolePayload {
// A generated staged name (relative path under staging directory).
string staged_name = 1;
}
message ArtifactInformation {
// A URN that describes the type of artifact
string type_urn = 1;
bytes type_payload = 2;
// A URN that describes the role of artifact
string role_urn = 3;
bytes role_payload = 4;
}
// An environment for executing UDFs. By default, an SDK container URL, but
// can also be a process forked by a command, or an externally managed process.
message Environment {
// (Required) The URN of the payload
string urn = 2;
// (Optional) The data specifying any parameters to the URN. If
// the URN does not require any arguments, this may be omitted.
bytes payload = 3;
// (Optional) Static display data for the environment. If there is none,
// it may be omitted.
repeated DisplayData display_data = 4;
// (Optional) A set of capabilities this environment supports. This is
// typically a list of common URNs designating coders, transforms, etc. that
// this environment understands (and a runner MAY use) despite not
// appearing in the pipeline proto. This may also be used to indicate
// support of optional protocols not tied to a concrete component.
repeated string capabilities = 5;
// (Optional) artifact dependency information used for executing UDFs in this environment.
repeated ArtifactInformation dependencies = 6;
reserved 1;
}
message StandardEnvironments {
enum Environments {
DOCKER = 0 [(beam_urn) = "beam:env:docker:v1"]; // A managed docker container to run user code.
PROCESS = 1 [(beam_urn) = "beam:env:process:v1"]; // A managed native process to run user code.
EXTERNAL = 2 [(beam_urn) = "beam:env:external:v1"]; // An external non managed process to run user code.
}
}
// The payload of a Docker image
message DockerPayload {
string container_image = 1; // implicitly linux_amd64.
}
message ProcessPayload {
string os = 1; // "linux", "darwin", ..
string arch = 2; // "amd64", ..
string command = 3; // process to execute
map<string, string> env = 4; // Environment variables
}
message ExternalPayload {
ApiServiceDescriptor endpoint = 1;
map<string, string> params = 2; // Arbitrary extra parameters to pass
}
// These URNs are used to indicate capabilities of environments that cannot
// simply be expressed as a component (such as a Coder or PTransform) that this
// environment understands.
message StandardProtocols {
enum Enum {
// Indicates suport for progress reporting via the legacy Metrics proto.
LEGACY_PROGRESS_REPORTING = 0 [(beam_urn) = "beam:protocol:progress_reporting:v0"];
// Indicates suport for progress reporting via the new MonitoringInfo proto.
PROGRESS_REPORTING = 1 [(beam_urn) = "beam:protocol:progress_reporting:v1"];
// Indicates suport for worker status protocol defined at
// https://s.apache.org/beam-fn-api-harness-status.
WORKER_STATUS = 2 [(beam_urn) = "beam:protocol:worker_status:v1"];
// Indicates this SDK can take advantage of multiple cores when processing
// concurrent process bundle requests. (Note that all SDKs must process
// an unbounded number of concurrent process bundle requests; this capability
// simply indicates this SDK can actually parallelize the work across multiple
// cores.
MULTI_CORE_BUNDLE_PROCESSING = 3 [(beam_urn) = "beam:protocol:multi_core_bundle_processing:v1"];
}
}
// These URNs are used to indicate requirements of a pipeline that cannot
// simply be expressed as a component (such as a Coder or PTransform) that the
// runner must understand. In many cases, this indicates a particular field
// of a transform must be inspected and respected (which allows new fields
// to be added in a forwards-compatible way).
message StandardRequirements {
enum Enum {
// This requirement indicates the state_spec and time_spec fields of ParDo
// transform payloads must be inspected.
REQUIRES_STATEFUL_PROCESSING = 0 [(beam_urn) = "beam:requirement:pardo:stateful:v1"];
// This requirement indicates the requests_finalization field of ParDo
// transform payloads must be inspected.
REQUIRES_BUNDLE_FINALIZATION = 1 [(beam_urn) = "beam:requirement:pardo:finalization:v1"];
// This requirement indicates the requires_stable_input field of ParDo
// transform payloads must be inspected.
REQUIRES_STABLE_INPUT = 2 [(beam_urn) = "beam:requirement:pardo:stable_input:v1"];
// This requirement indicates the requires_time_sorted_input field of ParDo
// transform payloads must be inspected.
REQUIRES_TIME_SORTED_INPUT = 3 [(beam_urn) = "beam:requirement:pardo:time_sorted_input:v1"];
// This requirement indicates the restriction_coder_id field of ParDo
// transform payloads must be inspected.
REQUIRES_SPLITTABLE_DOFN = 4 [(beam_urn) = "beam:requirement:pardo:splittable_dofn:v1"];
}
}
extend google.protobuf.EnumValueOptions {
// An extension to be used for specifying the standard URN of various
// pipeline entities, e.g. transforms, functions, coders etc.
// Code should refer to the URNs of those entities by extracting
// it from the (beam_urn) extension, rather than by hard-coding
// the URN.
//
// The recommended pattern for declaring it is (exemplified by coders):
//
// message StandardCoders {
// enum Enum {
// BYTES = 0 [(beam_urn) = "beam:coder:bytes:v1"];
// ...
// }
// }
//
// If there are multiple categories of entities of this type, use the
// following pattern (exemplified by PTransforms):
//
// message StandardPTransforms {
// enum Primitives {
// ...
// }
// enum Composites {
// ...
// }
// }
string beam_urn = 185324356;
// A value to store other constants
string beam_constant = 185324357;
}
// A URN along with a parameter object whose schema is determined by the
// URN.
//
// This structure is reused in two distinct, but compatible, ways:
//
// 1. This can be a specification of the function over PCollections
// that a PTransform computes.
// 2. This can be a specification of a user-defined function, possibly
// SDK-specific. (external to this message must be adequate context
// to indicate the environment in which the UDF can be understood).
//
// Though not explicit in this proto, there are two possibilities
// for the relationship of a runner to this specification that
// one should bear in mind:
//
// 1. The runner understands the URN. For example, it might be
// a well-known URN like "beam:transform:Top" or
// "beam:window_fn:FixedWindows" with
// an agreed-upon payload (e.g. a number or duration,
// respectively).
// 2. The runner does not understand the URN. It might be an
// SDK specific URN such as "beam:dofn:javasdk:1.0"
// that indicates to the SDK what the payload is,
// such as a serialized Java DoFn from a particular
// version of the Beam Java SDK. The payload will often
// then be an opaque message such as bytes in a
// language-specific serialization format.
message FunctionSpec {
// (Required) A URN that describes the accompanying payload.
// For any URN that is not recognized (by whomever is inspecting
// it) the parameter payload should be treated as opaque and
// passed as-is.
string urn = 1;
// (Optional) The data specifying any parameters to the URN. If
// the URN does not require any arguments, this may be omitted.
bytes payload = 3;
}
// A set of well known URNs describing display data.
//
// All descriptions must contain how the value should be classified and how it
// is encoded. Note that some types are logical types which convey contextual
// information about the pipeline in addition to an encoding while others only
// specify the encoding itself.
message StandardDisplayData {
enum DisplayData {
// A string label and value. Has a payload containing an encoded
// LabelledStringPayload.
LABELLED_STRING = 0 [(beam_urn) = "beam:display_data:labelled_string:v1"];
// Some samples that are being considered to become standard display data
// types follow:
// A path/location of a resource that this transform accesses. Encoded as
// the well known google.protobuf.StringValue type.
// INPUT_PATH = 1 [(beam_urn) = "beam:display_data:input_path:v1"];
// A timestamp encoded as the well known protobuf type
// google.protobuf.timestamp. Note that the value may be outside of the
// RFC3339 limit imposed.
// TIMESTAMP = 2 [(beam_urn) = "beam:display_data:timestamp:v1"];
// A duration encoded as the well known protobuf type
// google.protobuf.duration. Note that the value may be outside of the
// RFC3339 limit imposed.
// DURATION = 3 [(beam_urn) = "beam:display_data:duration:v1"];
}
}
message LabelledStringPayload {
// (Required) A human readable label for the value.
string label = 1;
// (Required) A value which will be displayed to the user. The urn describes
// how the value can be interpreted and/or categorized.
string value = 2;
}
// Static display data associated with a pipeline component. Display data is
// useful for pipeline runners IOs and diagnostic dashboards to display details
// about annotated components.
message DisplayData {
// A key used to describe the type of display data. See StandardDisplayData
// for the set of well known urns describing how the payload is meant to be
// interpreted.
string urn = 1;
// (Optional) The data specifying any parameters to the URN. If
// the URN does not require any arguments, this may be omitted.
bytes payload = 2;
}
// The following transforms are not part of the RunnerApi specification,
// but may be useful for graph construction and manipulation.
// A disjoint union of all the things that may contain references
// that require Components to resolve.
message MessageWithComponents {
// (Optional) The by-reference components of the root message,
// enabling a standalone message.
//
// If this is absent, it is expected that there are no
// references.
Components components = 1;
// (Required) The root message that may contain pointers
// that should be resolved by looking inside components.
oneof root {
Coder coder = 2;
CombinePayload combine_payload = 3;
FunctionSpec function_spec = 4;
ParDoPayload par_do_payload = 6;
PTransform ptransform = 7;
PCollection pcollection = 8;
ReadPayload read_payload = 9;
SideInput side_input = 11;
WindowIntoPayload window_into_payload = 12;
WindowingStrategy windowing_strategy = 13;
}
}
// The payload for an executable stage. This will eventually be passed to an SDK in the form of a
// ProcessBundleDescriptor.
message ExecutableStagePayload {
// (Required) Environment in which this stage executes.
//
// We use an environment rather than environment id
// because ExecutableStages use environments directly. This may change in the future.
Environment environment = 1;
// The wire coder settings of this executable stage
repeated WireCoderSetting wire_coder_settings = 9;
// (Required) Input PCollection id. This must be present as a value in the inputs of any
// PTransform the ExecutableStagePayload is the payload of.
string input = 2;
// The side inputs required for this executable stage. Each side input of each PTransform within
// this ExecutableStagePayload must be represented within this field.
repeated SideInputId side_inputs = 3;
// PTransform ids contained within this executable stage. This must contain at least one
// PTransform id.
repeated string transforms = 4;
// Output PCollection ids. This must be equal to the values of the outputs of any
// PTransform the ExecutableStagePayload is the payload of.
repeated string outputs = 5;
// (Required) The components for the Executable Stage. This must contain all of the Transforms
// in transforms, and the closure of all of the components they recognize.
Components components = 6;
// The user states required for this executable stage. Each user state of each PTransform within
// this ExecutableStagePayload must be represented within this field.
repeated UserStateId user_states = 7;
// The timers required for this executable stage. Each timer of each PTransform within
// this ExecutableStagePayload must be represented within this field.
repeated TimerId timers = 8;
// The timerfamilies required for this executable stage. Each timer familyof each PTransform within
// this ExecutableStagePayload must be represented within this field.
repeated TimerFamilyId timerFamilies = 10;
// A reference to a side input. Side inputs are uniquely identified by PTransform id and
// local name.
message SideInputId {
// (Required) The id of the PTransform that references this side input.
string transform_id = 1;
// (Required) The local name of this side input from the PTransform that references it.
string local_name = 2;
}
// A reference to user state. User states are uniquely identified by PTransform id and
// local name.
message UserStateId {
// (Required) The id of the PTransform that references this user state.
string transform_id = 1;
// (Required) The local name of this user state for the PTransform that references it.
string local_name = 2;
}
// A reference to a timer. Timers are uniquely identified by PTransform id and
// local name.
message TimerId {
// (Required) The id of the PTransform that references this timer.
string transform_id = 1;
// (Required) The local name of this timer for the PTransform that references it.
string local_name = 2;
}
// A reference to a timer. Timers are uniquely identified by PTransform id and
// local name.
message TimerFamilyId {
// (Required) The id of the PTransform that references this timer family.
string transform_id = 1;
// (Required) The local name of this timer family for the PTransform that references it.
string local_name = 2;
}
// Settings that decide the coder type of wire coder.
message WireCoderSetting {
// (Required) The URN of the wire coder.
// Note that only windowed value coder or parameterized windowed value coder are supported.
string urn = 1;
// (Optional) The data specifying any parameters to the URN. If
// the URN is beam:coder:windowed_value:v1, this may be omitted. If the URN is
// beam:coder:param_windowed_value:v1, the payload is an encoded windowed
// value using the beam:coder:windowed_value:v1 coder parameterized by
// a beam:coder:bytes:v1 element coder and the window coder that this
// param_windowed_value coder uses.
bytes payload = 2;
// (Required) The target(PCollection or Timer) this setting applies to.
oneof target {
// The input or output PCollection id this setting applies to.
string input_or_output_id = 3;
// The timer id this setting applies to.
TimerId timer = 4;
}
}
}