| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| /* |
| * Protocol Buffers describing the Runner API, which is the runner-independent, |
| * SDK-independent definition of the Beam model. |
| */ |
| |
| syntax = "proto3"; |
| |
| package org.apache.beam.model.pipeline.v1; |
| |
| option go_package = "pipeline_v1"; |
| option java_package = "org.apache.beam.model.pipeline.v1"; |
| option java_outer_classname = "RunnerApi"; |
| |
| import "endpoints.proto"; |
| import "google/protobuf/any.proto"; |
| import "google/protobuf/descriptor.proto"; |
| |
| message BeamConstants { |
| enum Constants { |
| // All timestamps in milliseconds since Jan 1, 1970. |
| MIN_TIMESTAMP_MILLIS = 0 [(beam_constant) = "-9223372036854775"]; |
| MAX_TIMESTAMP_MILLIS = 1 [(beam_constant) = "9223372036854775"]; |
| // The maximum timestamp for the global window. |
| // Triggers use maxTimestamp to set timers' timestamp. Timers fires when |
| // the watermark passes their timestamps. So, the timestamp needs to be |
| // smaller than the MAX_TIMESTAMP_MILLIS. |
| // One standard day is subtracted from MAX_TIMESTAMP_MILLIS to make sure |
| // the maxTimestamp is smaller than MAX_TIMESTAMP_MILLIS even after rounding up |
| // to seconds or minutes. See also GlobalWindow in the Java SDK. |
| GLOBAL_WINDOW_MAX_TIMESTAMP_MILLIS = 2 [(beam_constant) = "9223371950454775"]; |
| } |
| } |
| |
| // A set of mappings from id to message. This is included as an optional field |
| // on any proto message that may contain references needing resolution. |
| message Components { |
| // (Required) A map from pipeline-scoped id to PTransform. |
| map<string, PTransform> transforms = 1; |
| |
| // (Required) A map from pipeline-scoped id to PCollection. |
| map<string, PCollection> pcollections = 2; |
| |
| // (Required) A map from pipeline-scoped id to WindowingStrategy. |
| map<string, WindowingStrategy> windowing_strategies = 3; |
| |
| // (Required) A map from pipeline-scoped id to Coder. |
| map<string, Coder> coders = 4; |
| |
| // (Required) A map from pipeline-scoped id to Environment. |
| map<string, Environment> environments = 5; |
| } |
| |
| // A Pipeline is a hierarchical graph of PTransforms, linked |
| // by PCollections. |
| // |
| // This is represented by a number of by-reference maps to nodes, |
| // PCollections, SDK environments, UDF, etc., for |
| // supporting compact reuse and arbitrary graph structure. |
| // |
| // All of the keys in the maps here are arbitrary strings that are only |
| // required to be internally consistent within this proto message. |
| message Pipeline { |
| |
| // (Required) The coders, UDFs, graph nodes, etc, that make up |
| // this pipeline. |
| Components components = 1; |
| |
| // (Required) The ids of all PTransforms that are not contained within another PTransform. |
| // These must be in shallow topological order, so that traversing them recursively |
| // in this order yields a recursively topological traversal. |
| repeated string root_transform_ids = 2; |
| |
| // (Optional) Static display data for the pipeline. If there is none, |
| // it may be omitted. |
| DisplayData display_data = 3; |
| } |
| |
| // An applied PTransform! This does not contain the graph data, but only the |
| // fields specific to a graph node that is a Runner API transform |
| // between PCollections. |
| message PTransform { |
| |
| // (Required) A unique name for the application node. |
| // |
| // Ideally, this should be stable over multiple evolutions of a pipeline |
| // for the purposes of logging and associating pipeline state with a node, |
| // etc. |
| // |
| // If it is not stable, then the runner decides what will happen. But, most |
| // importantly, it must always be here and be unique, even if it is |
| // autogenerated. |
| string unique_name = 5; |
| |
| // (Optional) A URN and payload that, together, fully defined the semantics |
| // of this transform. |
| // |
| // If absent, this must be an "anonymous" composite transform. |
| // |
| // For primitive transform in the Runner API, this is required, and the |
| // payloads are well-defined messages. When the URN indicates ParDo it |
| // is a ParDoPayload, and so on. |
| // |
| // TODO: document the standardized URNs and payloads |
| // TODO: separate standardized payloads into a separate proto file |
| // |
| // For some special composite transforms, the payload is also officially |
| // defined: |
| // |
| // - when the URN is "beam:transforms:combine" it is a CombinePayload |
| // |
| FunctionSpec spec = 1; |
| |
| // (Optional) if this node is a composite, a list of the ids of |
| // transforms that it contains. |
| repeated string subtransforms = 2; |
| |
| // (Required) A map from local names of inputs (unique only with this map, and |
| // likely embedded in the transform payload and serialized user code) to |
| // PCollection ids. |
| // |
| // The payload for this transform may clarify the relationship of these |
| // inputs. For example: |
| // |
| // - for a Flatten transform they are merged |
| // - for a ParDo transform, some may be side inputs |
| // |
| // All inputs are recorded here so that the topological ordering of |
| // the graph is consistent whether or not the payload is understood. |
| // |
| map<string, string> inputs = 3; |
| |
| // (Required) A map from local names of outputs (unique only within this map, |
| // and likely embedded in the transform payload and serialized user code) |
| // to PCollection ids. |
| // |
| // The URN or payload for this transform node may clarify the type and |
| // relationship of these outputs. For example: |
| // |
| // - for a ParDo transform, these are tags on PCollections, which will be |
| // embedded in the DoFn. |
| // |
| map<string, string> outputs = 4; |
| |
| // (Optional) Static display data for this PTransform application. If |
| // there is none, or it is not relevant (such as use by the Fn API) |
| // then it may be omitted. |
| DisplayData display_data = 6; |
| } |
| |
| message StandardPTransforms { |
| enum Primitives { |
| // Represents Beam's parallel do operation. |
| // Payload: ParDoPayload. |
| // TODO(BEAM-3595): Change this to beam:transform:pardo:v1. |
| PAR_DO = 0 [(beam_urn) = "beam:transform:pardo:v1"]; |
| |
| // Represents Beam's flatten operation. |
| // Payload: None. |
| FLATTEN = 1 [(beam_urn) = "beam:transform:flatten:v1"]; |
| |
| // Represents Beam's group-by-key operation. |
| // Payload: None |
| GROUP_BY_KEY = 2 [(beam_urn) = "beam:transform:group_by_key:v1"]; |
| |
| // Represents the operation generating a single empty element. |
| IMPULSE = 3 [(beam_urn) = "beam:transform:impulse:v1"]; |
| |
| // Represents the Window.into() operation. |
| // Payload: WindowIntoPayload. |
| ASSIGN_WINDOWS = 4 [(beam_urn) = "beam:transform:window_into:v1"]; |
| |
| // Represents the TestStream. |
| // Payload: TestStreamPayload |
| TEST_STREAM = 5 [(beam_urn) = "beam:transform:teststream:v1"]; |
| |
| // Represents mapping of main input window onto side input window. |
| // |
| // Side input window mapping function: |
| // Input: KV<nonce, MainInputWindow> |
| // Output: KV<nonce, SideInputWindow> |
| // |
| // For each main input window, the side input window is returned. The |
| // nonce is used by a runner to associate each input with its output. |
| // The nonce is represented as an opaque set of bytes. |
| // |
| // Payload: WindowMappingFn from SideInputSpec. |
| MAP_WINDOWS = 6 [(beam_urn) = "beam:transform:map_windows:v1"]; |
| |
| // Used to merge windows during a GroupByKey. |
| // |
| // Window merging function: |
| // Input: KV<nonce, iterable<OriginalWindow>> |
| // Output: KV<nonce, KV<iterable<UnmergedOriginalWindow>, iterable<KV<MergedWindow, iterable<ConsumedOriginalWindow>>>> |
| // |
| // For each set of original windows, a list of all unmerged windows is |
| // output alongside a map of merged window to set of consumed windows. |
| // All original windows must be contained in either the unmerged original |
| // window set or one of the consumed original window sets. Each original |
| // window can only be part of one output set. The nonce is used by a runner |
| // to associate each input with its output. The nonce is represented as an |
| // opaque set of bytes. |
| // |
| // Payload: WindowFn from WindowingStrategy. |
| MERGE_WINDOWS = 7 [(beam_urn) = "beam:transform:merge_windows:v1"]; |
| } |
| enum DeprecatedPrimitives { |
| // Represents the operation to read a Bounded or Unbounded source. |
| // Payload: ReadPayload. |
| READ = 0 [(beam_urn) = "beam:transform:read:v1"]; |
| |
| // Runners should move away from translating `CreatePCollectionView` and treat this as |
| // part of the translation for a `ParDo` side input. |
| CREATE_VIEW = 1 [(beam_urn) = "beam:transform:create_view:v1"]; |
| } |
| enum Composites { |
| // Represents the Combine.perKey() operation. |
| // If this is produced by an SDK, it is assumed that the SDK understands |
| // each of CombineComponents. |
| // Payload: CombinePayload |
| COMBINE_PER_KEY = 0 [(beam_urn) = "beam:transform:combine_per_key:v1"]; |
| |
| // Represents the Combine.globally() operation. |
| // If this is produced by an SDK, it is assumed that the SDK understands |
| // each of CombineComponents. |
| // Payload: CombinePayload |
| COMBINE_GLOBALLY = 1 [(beam_urn) = "beam:transform:combine_globally:v1"]; |
| |
| // Represents the Reshuffle operation. |
| RESHUFFLE = 2 [(beam_urn) = "beam:transform:reshuffle:v1"]; |
| |
| // Less well-known. Payload: WriteFilesPayload. |
| WRITE_FILES = 3 [(beam_urn) = "beam:transform:write_files:v1"]; |
| } |
| // Payload for all of these: CombinePayload |
| enum CombineComponents { |
| // Represents the Pre-Combine part of a lifted Combine Per Key, as described |
| // in the following document: |
| // https://s.apache.org/beam-runner-api-combine-model#heading=h.ta0g6ase8z07 |
| // Payload: CombinePayload |
| COMBINE_PER_KEY_PRECOMBINE = 0 [(beam_urn) = "beam:transform:combine_per_key_precombine:v1"]; |
| |
| // Represents the Merge Accumulators part of a lifted Combine Per Key, as |
| // described in the following document: |
| // https://s.apache.org/beam-runner-api-combine-model#heading=h.jco9rvatld5m |
| // Payload: CombinePayload |
| COMBINE_PER_KEY_MERGE_ACCUMULATORS = 1 [(beam_urn) = "beam:transform:combine_per_key_merge_accumulators:v1"]; |
| |
| // Represents the Extract Outputs part of a lifted Combine Per Key, as |
| // described in the following document: |
| // https://s.apache.org/beam-runner-api-combine-model#heading=h.i9i6p8gtl6ku |
| // Payload: CombinePayload |
| COMBINE_PER_KEY_EXTRACT_OUTPUTS = 2 [(beam_urn) = "beam:transform:combine_per_key_extract_outputs:v1"]; |
| |
| // Represents the Combine Grouped Values transform, as described in the |
| // following document: |
| // https://s.apache.org/beam-runner-api-combine-model#heading=h.aj86ew4v1wk |
| // Payload: CombinePayload |
| COMBINE_GROUPED_VALUES = 3 [(beam_urn) = "beam:transform:combine_grouped_values:v1"]; |
| } |
| // Payload for all of these: ParDoPayload containing the user's SDF |
| enum SplittableParDoComponents { |
| // Pairs the input element with its initial restriction. |
| // Input: element; output: KV(element, restriction). |
| PAIR_WITH_RESTRICTION = 0 [(beam_urn) = "beam:transform:sdf_pair_with_restriction:v1"]; |
| |
| // Splits the restriction inside an element/restriction pair. |
| // Input: KV(element, restriction); output: KV(element, restriction). |
| SPLIT_RESTRICTION = 1 [(beam_urn) = "beam:transform:sdf_split_restriction:v1"]; |
| |
| // Applies the DoFn to every element/restriction pair in a uniquely keyed |
| // collection, in a splittable fashion. |
| // Input: KV(bytes, KV(element, restriction)); output: DoFn's output. |
| // The first "bytes" is an opaque unique key using the standard bytes coder. |
| // Typically a runner would rewrite this into a runner-specific grouping |
| // operation supporting state and timers, followed by PROCESS_ELEMENTS, |
| // with some runner-specific glue code in between. |
| PROCESS_KEYED_ELEMENTS = 2 [(beam_urn) = "beam:transform:sdf_process_keyed_elements:v1"]; |
| |
| // Like PROCESS_KEYED_ELEMENTS, but without the unique key - just elements |
| // and restrictions. |
| // Input: KV(element, restriction); output: DoFn's output. |
| PROCESS_ELEMENTS = 3 [(beam_urn) = "beam:transform:sdf_process_elements:v1"]; |
| |
| // Splits the restriction of each element/restriction pair and returns the |
| // resulting splits, with a corresponding floating point size estimations |
| // for each. |
| // A reasonable value for size is the number of bytes expected to be |
| // produced by this (element, restriction) pair. |
| // Input: KV(element, restriction) |
| // Output: KV(KV(element, restriction), size)) |
| SPLIT_AND_SIZE_RESTRICTIONS = 4 [(beam_urn) = "beam:transform:sdf_split_and_size_restrictions:v1"]; |
| |
| // Like PROCESS_ELEMENTS, but accepts the sized output produced by |
| // SPLIT_RESTRICTION_WITH_SIZING. |
| // Input: KV(KV(element, restriction), size); output: DoFn's output. |
| PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS = 5 [(beam_urn) = "beam:transform:sdf_process_sized_element_and_restrictions:v1"]; |
| |
| } |
| } |
| |
| message StandardSideInputTypes { |
| enum Enum { |
| ITERABLE = 0 [(beam_urn) = "beam:side_input:iterable:v1"]; |
| MULTIMAP = 1 [(beam_urn) = "beam:side_input:multimap:v1"]; |
| } |
| } |
| |
| // A PCollection! |
| message PCollection { |
| |
| // (Required) A unique name for the PCollection. |
| // |
| // Ideally, this should be stable over multiple evolutions of a pipeline |
| // for the purposes of logging and associating pipeline state with a node, |
| // etc. |
| // |
| // If it is not stable, then the runner decides what will happen. But, most |
| // importantly, it must always be here, even if it is autogenerated. |
| string unique_name = 1; |
| |
| // (Required) The id of the Coder for this PCollection. |
| string coder_id = 2; |
| |
| // (Required) Whether this PCollection is bounded or unbounded |
| IsBounded.Enum is_bounded = 3; |
| |
| // (Required) The id of the windowing strategy for this PCollection. |
| string windowing_strategy_id = 4; |
| |
| // (Optional) Static display data for this PTransform application. If |
| // there is none, or it is not relevant (such as use by the Fn API) |
| // then it may be omitted. |
| DisplayData display_data = 5; |
| } |
| |
| // The payload for the primitive ParDo transform. |
| message ParDoPayload { |
| |
| // (Required) The SdkFunctionSpec of the DoFn. |
| SdkFunctionSpec do_fn = 1; |
| |
| // (Required) Additional pieces of context the DoFn may require that |
| // are not otherwise represented in the payload. |
| // (may force runners to execute the ParDo differently) |
| repeated Parameter parameters = 2; |
| |
| // (Optional) A mapping of local input names to side inputs, describing |
| // the expected access pattern. |
| map<string, SideInput> side_inputs = 3; |
| |
| // (Optional) A mapping of local state names to state specifications. |
| map<string, StateSpec> state_specs = 4; |
| |
| // (Optional) A mapping of local timer names to timer specifications. |
| map<string, TimerSpec> timer_specs = 5; |
| |
| // Whether the DoFn is splittable |
| bool splittable = 6; |
| |
| // (Required if splittable == true) Id of the restriction coder. |
| string restriction_coder_id = 7; |
| |
| // (Optional) Only set when this ParDo can request bundle finalization. |
| bool requests_finalization = 8; |
| } |
| |
| // Parameters that a UDF might require. |
| // |
| // The details of how a runner sends these parameters to the SDK harness |
| // are the subject of the Fn API. |
| // |
| // The details of how an SDK harness delivers them to the UDF is entirely |
| // up to the SDK. (for some SDKs there may be parameters that are not |
| // represented here if the runner doesn't need to do anything) |
| // |
| // Here, the parameters are simply indicators to the runner that they |
| // need to run the function a particular way. |
| // |
| // TODO: the evolution of the Fn API will influence what needs explicit |
| // representation here |
| message Parameter { |
| Type.Enum type = 1; |
| |
| message Type { |
| enum Enum { |
| UNSPECIFIED = 0; |
| WINDOW = 1; |
| PIPELINE_OPTIONS = 2; |
| RESTRICTION_TRACKER = 3; |
| } |
| } |
| } |
| |
| message StateSpec { |
| oneof spec { |
| ReadModifyWriteStateSpec read_modify_write_spec = 1; |
| BagStateSpec bag_spec = 2; |
| CombiningStateSpec combining_spec = 3; |
| MapStateSpec map_spec = 4; |
| SetStateSpec set_spec = 5; |
| } |
| } |
| |
| message ReadModifyWriteStateSpec { |
| string coder_id = 1; |
| } |
| |
| message BagStateSpec { |
| string element_coder_id = 1; |
| } |
| |
| message CombiningStateSpec { |
| string accumulator_coder_id = 1; |
| SdkFunctionSpec combine_fn = 2; |
| } |
| |
| message MapStateSpec { |
| string key_coder_id = 1; |
| string value_coder_id = 2; |
| } |
| |
| message SetStateSpec { |
| string element_coder_id = 1; |
| } |
| |
| message TimerSpec { |
| TimeDomain.Enum time_domain = 1; |
| string timer_coder_id = 2; |
| } |
| |
| message IsBounded { |
| enum Enum { |
| UNSPECIFIED = 0; |
| UNBOUNDED = 1; |
| BOUNDED = 2; |
| } |
| } |
| |
| // The payload for the primitive Read transform. |
| message ReadPayload { |
| |
| // (Required) The SdkFunctionSpec of the source for this Read. |
| SdkFunctionSpec source = 1; |
| |
| // (Required) Whether the source is bounded or unbounded |
| IsBounded.Enum is_bounded = 2; |
| |
| // TODO: full audit of fields required by runners as opposed to SDK harness |
| } |
| |
| // The payload for the WindowInto transform. |
| message WindowIntoPayload { |
| |
| // (Required) The SdkFunctionSpec of the WindowFn. |
| SdkFunctionSpec window_fn = 1; |
| } |
| |
| // The payload for the special-but-not-primitive Combine transform. |
| message CombinePayload { |
| |
| // (Required) The SdkFunctionSpec of the CombineFn. |
| SdkFunctionSpec combine_fn = 1; |
| |
| // (Required) A reference to the Coder to use for accumulators of the CombineFn |
| string accumulator_coder_id = 2; |
| } |
| |
| // The payload for the test-only primitive TestStream |
| message TestStreamPayload { |
| |
| // (Required) the coder for elements in the TestStream events |
| string coder_id = 1; |
| |
| repeated Event events = 2; |
| |
| message Event { |
| oneof event { |
| AdvanceWatermark watermark_event = 1; |
| AdvanceProcessingTime processing_time_event = 2; |
| AddElements element_event = 3; |
| } |
| |
| message AdvanceWatermark { |
| int64 new_watermark = 1; |
| } |
| |
| message AdvanceProcessingTime { |
| int64 advance_duration = 1; |
| } |
| |
| message AddElements { |
| repeated TimestampedElement elements = 1; |
| } |
| } |
| |
| message TimestampedElement { |
| bytes encoded_element = 1; |
| int64 timestamp = 2; |
| } |
| } |
| // The payload for the special-but-not-primitive WriteFiles transform. |
| message WriteFilesPayload { |
| |
| // (Required) The SdkFunctionSpec of the FileBasedSink. |
| SdkFunctionSpec sink = 1; |
| |
| // (Required) The format function. |
| SdkFunctionSpec format_function = 2; |
| |
| bool windowed_writes = 3; |
| |
| bool runner_determined_sharding = 4; |
| |
| map<string, SideInput> side_inputs = 5; |
| } |
| |
| // A coder, the binary format for serialization and deserialization of data in |
| // a pipeline. |
| message Coder { |
| |
| // (Required) A specification for the coder, as a URN plus parameters. This |
| // may be a cross-language agreed-upon format, or it may be a "custom coder" |
| // that can only be used by a particular SDK. It does not include component |
| // coders, as it is beneficial for these to be comprehensible to a runner |
| // regardless of whether the binary format is agreed-upon. |
| FunctionSpec spec = 1; |
| |
| // (Optional) If this coder is parametric, such as ListCoder(VarIntCoder), |
| // this is a list of the components. In order for encodings to be identical, |
| // the SdkFunctionSpec and all components must be identical, recursively. |
| repeated string component_coder_ids = 2; |
| } |
| |
| message StandardCoders { |
| enum Enum { |
| // Components: None |
| BYTES = 0 [(beam_urn) = "beam:coder:bytes:v1"]; |
| |
| // Components: None |
| STRING_UTF8 = 10 [(beam_urn) = "beam:coder:string_utf8:v1"]; |
| |
| // Components: The key and value coder, in that order. |
| KV = 1 [(beam_urn) = "beam:coder:kv:v1"]; |
| |
| // Components: None |
| BOOL = 12 [(beam_urn) = "beam:coder:bool:v1"]; |
| |
| // Variable length Encodes a 64-bit integer. |
| // Components: None |
| VARINT = 2 [(beam_urn) = "beam:coder:varint:v1"]; |
| |
| // Encodes the floating point value as a big-endian 64-bit integer |
| // according to the IEEE 754 double format bit layout. |
| // Components: None |
| DOUBLE = 11 [(beam_urn) = "beam:coder:double:v1"]; |
| |
| // Encodes an iterable of elements. |
| // |
| // The encoding for an iterable [e1...eN] of known length N is |
| // |
| // fixed32(N) |
| // encode(e1) encode(e2) encode(e3) ... encode(eN) |
| // |
| // If the length is unknown, it is batched up into groups of size b1..bM |
| // and encoded as |
| // |
| // fixed32(-1) |
| // varInt64(b1) encode(e1) encode(e2) ... encode(e_b1) |
| // varInt64(b2) encode(e_(b1+1)) encode(e_(b1+2)) ... encode(e_(b1+b2)) |
| // ... |
| // varInt64(bM) encode(e_(N-bM+1)) encode(e_(N-bM+2)) ... encode(eN) |
| // varInt64(0) |
| // |
| // Components: Coder for a single element. |
| ITERABLE = 3 [(beam_urn) = "beam:coder:iterable:v1"]; |
| |
| // Encodes a timer containing a timestamp and a user specified payload. |
| // The encoding is represented as: timestamp payload |
| // timestamp - a big endian 8 byte integer representing millis-since-epoch. |
| // The encoded representation is shifted so that the byte representation of |
| // negative values are lexicographically ordered before the byte representation |
| // of positive values. This is typically done by subtracting -9223372036854775808 |
| // from the value and encoding it as a signed big endian integer. Example values: |
| // |
| // -9223372036854775808: 00 00 00 00 00 00 00 00 |
| // -255: 7F FF FF FF FF FF FF 01 |
| // -1: 7F FF FF FF FF FF FF FF |
| // 0: 80 00 00 00 00 00 00 00 |
| // 1: 80 00 00 00 00 00 00 01 |
| // 256: 80 00 00 00 00 00 01 00 |
| // 9223372036854775807: FF FF FF FF FF FF FF FF |
| // payload - user defined data, uses the component coder |
| // Components: Coder for the payload. |
| TIMER = 4 [(beam_urn) = "beam:coder:timer:v1"]; |
| |
| /* |
| * The following coders are typically not specified manually by the user, |
| * but are used at runtime and must be supported by every SDK. |
| */ |
| // Components: None |
| INTERVAL_WINDOW = 5 [(beam_urn) = "beam:coder:interval_window:v1"]; |
| |
| // Components: The coder to attach a length prefix to |
| LENGTH_PREFIX = 6 [(beam_urn) = "beam:coder:length_prefix:v1"]; |
| |
| // Components: None |
| GLOBAL_WINDOW = 7 [(beam_urn) = "beam:coder:global_window:v1"]; |
| |
| // Encodes an element, the window the value is in, the timestamp of the element, and the pane |
| // of the element |
| // Components: The element coder and the window coder, in that order |
| WINDOWED_VALUE = 8 [(beam_urn) = "beam:coder:windowed_value:v1"]; |
| |
| // Encodes an iterable of elements, some of which may be stored elsewhere. |
| // |
| // The encoding for a state-backed iterable is the same as that for |
| // an iterable, but the final varInt64(0) terminating the set of batches |
| // may instead be replaced by |
| // |
| // varInt64(-1) |
| // varInt64(len(token)) |
| // token |
| // |
| // where token is an opaque byte string that can be used to fetch the |
| // remainder of the iterable (e.g. over the state API). |
| // |
| // Components: Coder for a single element. |
| // Experimental. |
| STATE_BACKED_ITERABLE = 9 [(beam_urn) = "beam:coder:state_backed_iterable:v1"]; |
| |
| // Additional Standard Coders |
| // -------------------------- |
| // The following coders are not required to be implemented for an SDK or |
| // runner to support the Beam model, but enable users to take advantage of |
| // schema-aware transforms. |
| |
| // Encodes a "row", an element with a known schema, defined by an |
| // instance of Schema from schema.proto. |
| // |
| // A row is encoded as the concatenation of: |
| // - The number of attributes in the schema, encoded with |
| // beam:coder:varint:v1. This makes it possible to detect certain |
| // allowed schema changes (appending or removing columns) in |
| // long-running streaming pipelines. |
| // - A byte array representing a packed bitset indicating null fields (a |
| // 1 indicating a null) encoded with beam:coder:bytes:v1. The unused |
| // bits in the last byte must be set to 0. If there are no nulls an |
| // empty byte array is encoded. |
| // The two-byte bitset (not including the lenghth-prefix) for the row |
| // [NULL, 0, 0, 0, NULL, 0, 0, NULL, 0, NULL] would be |
| // [0b10010001, 0b00000010] |
| // - An encoding for each non-null field, concatenated together. |
| // |
| // Schema types are mapped to coders as follows: |
| // AtomicType: |
| // BYTE: not yet a standard coder (BEAM-7996) |
| // INT16: not yet a standard coder (BEAM-7996) |
| // INT32: beam:coder:varint:v1 |
| // INT64: beam:coder:varint:v1 |
| // FLOAT: not yet a standard coder (BEAM-7996) |
| // DOUBLE: beam:coder:double:v1 |
| // STRING: beam:coder:string_utf8:v1 |
| // BOOLEAN: beam:coder:bool:v1 |
| // BYTES: beam:coder:bytes:v1 |
| // ArrayType: beam:coder:iterable:v1 (always has a known length) |
| // MapType: not yet a standard coder (BEAM-7996) |
| // RowType: beam:coder:row:v1 |
| // LogicalType: Uses the coder for its representation. |
| // |
| // The payload for RowCoder is an instance of Schema. |
| // Components: None |
| // Experimental. |
| ROW = 13 [(beam_urn) = "beam:coder:row:v1"]; |
| } |
| } |
| |
| // A windowing strategy describes the window function, triggering, allowed |
| // lateness, and accumulation mode for a PCollection. |
| // |
| // TODO: consider inlining field on PCollection |
| message WindowingStrategy { |
| |
| // (Required) The SdkFunctionSpec of the UDF that assigns windows, |
| // merges windows, and shifts timestamps before they are |
| // combined according to the OutputTime. |
| SdkFunctionSpec window_fn = 1; |
| |
| // (Required) Whether or not the window fn is merging. |
| // |
| // This knowledge is required for many optimizations. |
| MergeStatus.Enum merge_status = 2; |
| |
| // (Required) The coder for the windows of this PCollection. |
| string window_coder_id = 3; |
| |
| // (Required) The trigger to use when grouping this PCollection. |
| Trigger trigger = 4; |
| |
| // (Required) The accumulation mode indicates whether new panes are a full |
| // replacement for prior panes or whether they are deltas to be combined |
| // with other panes (the combine should correspond to whatever the upstream |
| // grouping transform is). |
| AccumulationMode.Enum accumulation_mode = 5; |
| |
| // (Required) The OutputTime specifies, for a grouping transform, how to |
| // compute the aggregate timestamp. The window_fn will first possibly shift |
| // it later, then the OutputTime takes the max, min, or ignores it and takes |
| // the end of window. |
| // |
| // This is actually only for input to grouping transforms, but since they |
| // may be introduced in runner-specific ways, it is carried along with the |
| // windowing strategy. |
| OutputTime.Enum output_time = 6; |
| |
| // (Required) Indicate when output should be omitted upon window expiration. |
| ClosingBehavior.Enum closing_behavior = 7; |
| |
| // (Required) The duration, in milliseconds, beyond the end of a window at |
| // which the window becomes droppable. |
| int64 allowed_lateness = 8; |
| |
| // (Required) Indicate whether empty on-time panes should be omitted. |
| OnTimeBehavior.Enum OnTimeBehavior = 9; |
| |
| // (Required) Whether or not the window fn assigns inputs to exactly one window |
| // |
| // This knowledge is required for some optimizations |
| bool assigns_to_one_window = 10; |
| } |
| |
| // Whether or not a PCollection's WindowFn is non-merging, merging, or |
| // merging-but-already-merged, in which case a subsequent GroupByKey is almost |
| // always going to do something the user does not want |
| message MergeStatus { |
| enum Enum { |
| UNSPECIFIED = 0; |
| |
| // The WindowFn does not require merging. |
| // Examples: global window, FixedWindows, SlidingWindows |
| NON_MERGING = 1; |
| |
| // The WindowFn is merging and the PCollection has not had merging |
| // performed. |
| // Example: Sessions prior to a GroupByKey |
| NEEDS_MERGE = 2; |
| |
| // The WindowFn is merging and the PCollection has had merging occur |
| // already. |
| // Example: Sessions after a GroupByKey |
| ALREADY_MERGED = 3; |
| } |
| } |
| |
| // Whether or not subsequent outputs of aggregations should be entire |
| // replacement values or just the aggregation of inputs received since |
| // the prior output. |
| message AccumulationMode { |
| enum Enum { |
| UNSPECIFIED = 0; |
| |
| // The aggregation is discarded when it is output |
| DISCARDING = 1; |
| |
| // The aggregation is accumulated across outputs |
| ACCUMULATING = 2; |
| |
| // The aggregation emits retractions when it is output |
| RETRACTING = 3; |
| } |
| } |
| |
| // Controls whether or not an aggregating transform should output data |
| // when a window expires. |
| message ClosingBehavior { |
| enum Enum { |
| UNSPECIFIED = 0; |
| |
| // Emit output when a window expires, whether or not there has been |
| // any new data since the last output. |
| EMIT_ALWAYS = 1; |
| |
| // Only emit output when new data has arrives since the last output |
| EMIT_IF_NONEMPTY = 2; |
| } |
| } |
| |
| // Controls whether or not an aggregating transform should output data |
| // when an on-time pane is empty. |
| message OnTimeBehavior { |
| enum Enum { |
| UNSPECIFIED = 0; |
| |
| // Always fire the on-time pane. Even if there is no new data since |
| // the previous firing, an element will be produced. |
| FIRE_ALWAYS = 1; |
| |
| // Only fire the on-time pane if there is new data since the previous firing. |
| FIRE_IF_NONEMPTY = 2; |
| } |
| } |
| |
| // When a number of windowed, timestamped inputs are aggregated, the timestamp |
| // for the resulting output. |
| message OutputTime { |
| enum Enum { |
| UNSPECIFIED = 0; |
| |
| // The output has the timestamp of the end of the window. |
| END_OF_WINDOW = 1; |
| |
| // The output has the latest timestamp of the input elements since |
| // the last output. |
| LATEST_IN_PANE = 2; |
| |
| // The output has the earliest timestamp of the input elements since |
| // the last output. |
| EARLIEST_IN_PANE = 3; |
| } |
| } |
| |
| // The different time domains in the Beam model. |
| message TimeDomain { |
| enum Enum { |
| UNSPECIFIED = 0; |
| |
| // Event time is time from the perspective of the data |
| EVENT_TIME = 1; |
| |
| // Processing time is time from the perspective of the |
| // execution of your pipeline |
| PROCESSING_TIME = 2; |
| |
| // Synchronized processing time is the minimum of the |
| // processing time of all pending elements. |
| // |
| // The "processing time" of an element refers to |
| // the local processing time at which it was emitted |
| SYNCHRONIZED_PROCESSING_TIME = 3; |
| } |
| } |
| |
| // A small DSL for expressing when to emit new aggregations |
| // from a GroupByKey or CombinePerKey |
| // |
| // A trigger is described in terms of when it is _ready_ to permit output. |
| message Trigger { |
| |
| // Ready when all subtriggers are ready. |
| message AfterAll { |
| repeated Trigger subtriggers = 1; |
| } |
| |
| // Ready when any subtrigger is ready. |
| message AfterAny { |
| repeated Trigger subtriggers = 1; |
| } |
| |
| // Starting with the first subtrigger, ready when the _current_ subtrigger |
| // is ready. After output, advances the current trigger by one. |
| message AfterEach { |
| repeated Trigger subtriggers = 1; |
| } |
| |
| // Ready after the input watermark is past the end of the window. |
| // |
| // May have implicitly-repeated subtriggers for early and late firings. |
| // When the end of the window is reached, the trigger transitions between |
| // the subtriggers. |
| message AfterEndOfWindow { |
| |
| // (Optional) A trigger governing output prior to the end of the window. |
| Trigger early_firings = 1; |
| |
| // (Optional) A trigger governing output after the end of the window. |
| Trigger late_firings = 2; |
| } |
| |
| // After input arrives, ready when the specified delay has passed. |
| message AfterProcessingTime { |
| |
| // (Required) The transforms to apply to an arriving element's timestamp, |
| // in order |
| repeated TimestampTransform timestamp_transforms = 1; |
| } |
| |
| // Ready whenever upstream processing time has all caught up with |
| // the arrival time of an input element |
| message AfterSynchronizedProcessingTime { |
| } |
| |
| // The default trigger. Equivalent to Repeat { AfterEndOfWindow } but |
| // specially denoted to indicate the user did not alter the triggering. |
| message Default { |
| } |
| |
| // Ready whenever the requisite number of input elements have arrived |
| message ElementCount { |
| int32 element_count = 1; |
| } |
| |
| // Never ready. There will only be an ON_TIME output and a final |
| // output at window expiration. |
| message Never { |
| } |
| |
| // Always ready. This can also be expressed as ElementCount(1) but |
| // is more explicit. |
| message Always { |
| } |
| |
| // Ready whenever either of its subtriggers are ready, but finishes output |
| // when the finally subtrigger fires. |
| message OrFinally { |
| |
| // (Required) Trigger governing main output; may fire repeatedly. |
| Trigger main = 1; |
| |
| // (Required) Trigger governing termination of output. |
| Trigger finally = 2; |
| } |
| |
| // Ready whenever the subtrigger is ready; resets state when the subtrigger |
| // completes. |
| message Repeat { |
| // (Require) Trigger that is run repeatedly. |
| Trigger subtrigger = 1; |
| } |
| |
| // The full disjoint union of possible triggers. |
| oneof trigger { |
| AfterAll after_all = 1; |
| AfterAny after_any = 2; |
| AfterEach after_each = 3; |
| AfterEndOfWindow after_end_of_window = 4; |
| AfterProcessingTime after_processing_time = 5; |
| AfterSynchronizedProcessingTime after_synchronized_processing_time = 6; |
| Always always = 12; |
| Default default = 7; |
| ElementCount element_count = 8; |
| Never never = 9; |
| OrFinally or_finally = 10; |
| Repeat repeat = 11; |
| } |
| } |
| |
| // A specification for a transformation on a timestamp. |
| // |
| // Primarily used by AfterProcessingTime triggers to transform |
| // the arrival time of input to a target time for firing. |
| message TimestampTransform { |
| oneof timestamp_transform { |
| Delay delay = 1; |
| AlignTo align_to = 2; |
| } |
| |
| message Delay { |
| // (Required) The delay, in milliseconds. |
| int64 delay_millis = 1; |
| } |
| |
| message AlignTo { |
| // (Required) A duration to which delays should be quantized |
| // in milliseconds. |
| int64 period = 3; |
| |
| // (Required) An offset from 0 for the quantization specified by |
| // alignment_size, in milliseconds |
| int64 offset = 4; |
| } |
| } |
| |
| // A specification for how to "side input" a PCollection. |
| message SideInput { |
| // (Required) URN of the access pattern required by the `view_fn` to present |
| // the desired SDK-specific interface to a UDF. |
| // |
| // This access pattern defines the SDK harness <-> Runner Harness RPC |
| // interface for accessing a side input. |
| // |
| // The only access pattern intended for Beam, because of its superior |
| // performance possibilities, is "beam:sideinput:multimap" (or some such |
| // URN) |
| FunctionSpec access_pattern = 1; |
| |
| // (Required) The SdkFunctionSpec of the UDF that adapts a particular |
| // access_pattern to a user-facing view type. |
| // |
| // For example, View.asSingleton() may include a `view_fn` that adapts a |
| // specially-designed multimap to a single value per window. |
| SdkFunctionSpec view_fn = 2; |
| |
| // (Required) The SdkFunctionSpec of the UDF that maps a main input window |
| // to a side input window. |
| // |
| // For example, when the main input is in fixed windows of one hour, this |
| // can specify that the side input should be accessed according to the day |
| // in which that hour falls. |
| SdkFunctionSpec window_mapping_fn = 3; |
| } |
| |
| // An environment for executing UDFs. By default, an SDK container URL, but |
| // can also be a process forked by a command, or an externally managed process. |
| message Environment { |
| // (Required) The URN of the payload |
| string urn = 2; |
| |
| // (Optional) The data specifying any parameters to the URN. If |
| // the URN does not require any arguments, this may be omitted. |
| bytes payload = 3; |
| |
| reserved 1; |
| } |
| |
| message StandardEnvironments { |
| enum Environments { |
| DOCKER = 0 [(beam_urn) = "beam:env:docker:v1"]; // A managed docker container to run user code. |
| |
| PROCESS = 1 [(beam_urn) = "beam:env:process:v1"]; // A managed native process to run user code. |
| |
| EXTERNAL = 2 [(beam_urn) = "beam:env:external:v1"]; // An external non managed process to run user code. |
| } |
| } |
| |
| // The payload of a Docker image |
| message DockerPayload { |
| string container_image = 1; // implicitly linux_amd64. |
| } |
| |
| message ProcessPayload { |
| string os = 1; // "linux", "darwin", .. |
| string arch = 2; // "amd64", .. |
| string command = 3; // process to execute |
| map<string, string> env = 4; // Environment variables |
| } |
| |
| message ExternalPayload { |
| ApiServiceDescriptor endpoint = 1; |
| map<string, string> params = 2; // Arbitrary extra parameters to pass |
| } |
| |
| // A specification of a user defined function. |
| // |
| message SdkFunctionSpec { |
| |
| // (Required) A full specification of this function. |
| FunctionSpec spec = 1; |
| |
| // (Required) Reference to an execution environment capable of |
| // invoking this function. |
| string environment_id = 2; |
| } |
| |
| extend google.protobuf.EnumValueOptions { |
| // An extension to be used for specifying the standard URN of various |
| // pipeline entities, e.g. transforms, functions, coders etc. |
| // Code should refer to the URNs of those entities by extracting |
| // it from the (beam_urn) extension, rather than by hard-coding |
| // the URN. |
| // |
| // The recommended pattern for declaring it is (exemplified by coders): |
| // |
| // message StandardCoders { |
| // enum Enum { |
| // BYTES = 0 [(beam_urn) = "beam:coder:bytes:v1"]; |
| // ... |
| // } |
| // } |
| // |
| // If there are multiple categories of entities of this type, use the |
| // following pattern (exemplified by PTransforms): |
| // |
| // message StandardPTransforms { |
| // enum Primitives { |
| // ... |
| // } |
| // enum Composites { |
| // ... |
| // } |
| // } |
| string beam_urn = 185324356; |
| // A value to store other constants |
| string beam_constant = 185324357; |
| } |
| |
| // A URN along with a parameter object whose schema is determined by the |
| // URN. |
| // |
| // This structure is reused in two distinct, but compatible, ways: |
| // |
| // 1. This can be a specification of the function over PCollections |
| // that a PTransform computes. |
| // 2. This can be a specification of a user-defined function, possibly |
| // SDK-specific. (external to this message must be adequate context |
| // to indicate the environment in which the UDF can be understood). |
| // |
| // Though not explicit in this proto, there are two possibilities |
| // for the relationship of a runner to this specification that |
| // one should bear in mind: |
| // |
| // 1. The runner understands the URN. For example, it might be |
| // a well-known URN like "beam:transform:Top" or |
| // "beam:windowfn:FixedWindows" with |
| // an agreed-upon payload (e.g. a number or duration, |
| // respectively). |
| // 2. The runner does not understand the URN. It might be an |
| // SDK specific URN such as "beam:dofn:javasdk:1.0" |
| // that indicates to the SDK what the payload is, |
| // such as a serialized Java DoFn from a particular |
| // version of the Beam Java SDK. The payload will often |
| // then be an opaque message such as bytes in a |
| // language-specific serialization format. |
| message FunctionSpec { |
| |
| // (Required) A URN that describes the accompanying payload. |
| // For any URN that is not recognized (by whomever is inspecting |
| // it) the parameter payload should be treated as opaque and |
| // passed as-is. |
| string urn = 1; |
| |
| // (Optional) The data specifying any parameters to the URN. If |
| // the URN does not require any arguments, this may be omitted. |
| bytes payload = 3; |
| } |
| |
| // TODO: transfer javadoc here |
| message DisplayData { |
| |
| // (Required) The list of display data. |
| repeated Item items = 1; |
| |
| // A complete identifier for a DisplayData.Item |
| message Identifier { |
| |
| // (Required) The transform originating this display data. |
| string transform_id = 1; |
| |
| // (Optional) The URN indicating the type of the originating transform, |
| // if there is one. |
| string transform_urn = 2; |
| |
| string key = 3; |
| } |
| |
| // A single item of display data. |
| message Item { |
| // (Required) |
| Identifier id = 1; |
| |
| // (Required) |
| Type.Enum type = 2; |
| |
| // (Required) |
| google.protobuf.Any value = 3; |
| |
| // (Optional) |
| google.protobuf.Any short_value = 4; |
| |
| // (Optional) |
| string label = 5; |
| |
| // (Optional) |
| string link_url = 6; |
| } |
| |
| message Type { |
| enum Enum { |
| UNSPECIFIED = 0; |
| STRING = 1; |
| INTEGER = 2; |
| FLOAT = 3; |
| BOOLEAN = 4; |
| TIMESTAMP = 5; |
| DURATION = 6; |
| JAVA_CLASS = 7; |
| } |
| } |
| } |
| |
| |
| // The following transforms are not part of the RunnerApi specification, |
| // but may be useful for graph construction and manipulation. |
| |
| |
| // A disjoint union of all the things that may contain references |
| // that require Components to resolve. |
| message MessageWithComponents { |
| |
| // (Optional) The by-reference components of the root message, |
| // enabling a standalone message. |
| // |
| // If this is absent, it is expected that there are no |
| // references. |
| Components components = 1; |
| |
| // (Required) The root message that may contain pointers |
| // that should be resolved by looking inside components. |
| oneof root { |
| Coder coder = 2; |
| CombinePayload combine_payload = 3; |
| SdkFunctionSpec sdk_function_spec = 4; |
| ParDoPayload par_do_payload = 6; |
| PTransform ptransform = 7; |
| PCollection pcollection = 8; |
| ReadPayload read_payload = 9; |
| SideInput side_input = 11; |
| WindowIntoPayload window_into_payload = 12; |
| WindowingStrategy windowing_strategy = 13; |
| FunctionSpec function_spec = 14; |
| } |
| } |
| |
| // The payload for an executable stage. This will eventually be passed to an SDK in the form of a |
| // ProcessBundleDescriptor. |
| message ExecutableStagePayload { |
| |
| // (Required) Environment in which this stage executes. |
| // |
| // We use an environment rather than environment id |
| // because ExecutableStages use environments directly. This may change in the future. |
| Environment environment = 1; |
| |
| // (Required) Input PCollection id. This must be present as a value in the inputs of any |
| // PTransform the ExecutableStagePayload is the payload of. |
| string input = 2; |
| |
| // The side inputs required for this executable stage. Each side input of each PTransform within |
| // this ExecutableStagePayload must be represented within this field. |
| repeated SideInputId side_inputs = 3; |
| |
| // PTransform ids contained within this executable stage. This must contain at least one |
| // PTransform id. |
| repeated string transforms = 4; |
| |
| // Output PCollection ids. This must be equal to the values of the outputs of any |
| // PTransform the ExecutableStagePayload is the payload of. |
| repeated string outputs = 5; |
| |
| // (Required) The components for the Executable Stage. This must contain all of the Transforms |
| // in transforms, and the closure of all of the components they recognize. |
| Components components = 6; |
| |
| // The user states required for this executable stage. Each user state of each PTransform within |
| // this ExecutableStagePayload must be represented within this field. |
| repeated UserStateId user_states = 7; |
| |
| // The timers required for this executable stage. Each timer of each PTransform within |
| // this ExecutableStagePayload must be represented within this field. |
| repeated TimerId timers = 8; |
| |
| // A reference to a side input. Side inputs are uniquely identified by PTransform id and |
| // local name. |
| message SideInputId { |
| // (Required) The id of the PTransform that references this side input. |
| string transform_id = 1; |
| |
| // (Required) The local name of this side input from the PTransform that references it. |
| string local_name = 2; |
| } |
| |
| // A reference to user state. User states are uniquely identified by PTransform id and |
| // local name. |
| message UserStateId { |
| // (Required) The id of the PTransform that references this user state. |
| string transform_id = 1; |
| |
| // (Required) The local name of this user state for the PTransform that references it. |
| string local_name = 2; |
| } |
| |
| // A reference to a timer. Timers are uniquely identified by PTransform id and |
| // local name. |
| message TimerId { |
| // (Required) The id of the PTransform that references this timer. |
| string transform_id = 1; |
| |
| // (Required) The local name of this timer for the PTransform that references it. |
| string local_name = 2; |
| } |
| } |