blob: 95693e37180c083c867805f9a9807b0ff73fed41 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Protocol Buffers describing the Fn API and boostrapping.
*
* TODO: Usage of plural names in lists looks awkward in Java
* e.g. getOutputsMap, addCodersBuilder
*
* TODO: gRPC / proto field names conflict with generated code
* e.g. "class" in java, "output" in python
*/
syntax = "proto3";
/* TODO: Consider consolidating common components in another package
* and language namespaces for re-use with Runner Api.
*/
package org.apache.beam.model.fn_execution.v1;
option go_package = "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1;fnexecution_v1";
option java_package = "org.apache.beam.model.fnexecution.v1";
option java_outer_classname = "BeamFnApi";
import "beam_runner_api.proto";
import "endpoints.proto";
import "google/protobuf/descriptor.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";
import "metrics.proto";
// A descriptor for connecting to a remote port using the Beam Fn Data API.
// Allows for communication between two environments (for example between the
// runner and the SDK).
// Stable
message RemoteGrpcPort {
// (Required) An API descriptor which describes where to
// connect to including any authentication that is required.
org.apache.beam.model.pipeline.v1.ApiServiceDescriptor
api_service_descriptor = 1;
// (Required) The ID of the Coder that will be used to encode and decode data
// sent over this port.
string coder_id = 2;
}
/*
* Control Plane API
*
* Progress reporting and splitting still need further vetting. Also, this may
* change with the addition of new types of instructions/responses related to
* metrics.
*/
// An API that describes the work that a SDK harness is meant to do.
// Stable
service BeamFnControl {
// Instructions sent by the runner to the SDK requesting different types
// of work.
rpc Control(
// A stream of responses to instructions the SDK was asked to be
// performed.
stream InstructionResponse)
returns (
// A stream of instructions requested of the SDK to be performed.
stream InstructionRequest) {}
// Used to get the full process bundle descriptors for bundles one
// is asked to process.
rpc GetProcessBundleDescriptor(GetProcessBundleDescriptorRequest) returns (
ProcessBundleDescriptor) {}
}
// Requests the ProcessBundleDescriptor with the given id.
message GetProcessBundleDescriptorRequest {
string process_bundle_descriptor_id = 1;
}
// A request sent by a runner which the SDK is asked to fulfill.
// For any unsupported request type, an error should be returned with a
// matching instruction id.
// Stable
message InstructionRequest {
// (Required) A unique identifier provided by the runner which represents
// this requests execution. The InstructionResponse MUST have the matching id.
string instruction_id = 1;
// (Required) A request that the SDK Harness needs to interpret.
oneof request {
ProcessBundleRequest process_bundle = 1001;
ProcessBundleProgressRequest process_bundle_progress = 1002;
ProcessBundleSplitRequest process_bundle_split = 1003;
FinalizeBundleRequest finalize_bundle = 1004;
MonitoringInfosMetadataRequest monitoring_infos = 1005;
HarnessMonitoringInfosRequest harness_monitoring_infos = 1006;
// DEPRECATED
RegisterRequest register = 1000;
}
}
// The response for an associated request the SDK had been asked to fulfill.
// Stable
message InstructionResponse {
// (Required) A reference provided by the runner which represents a requests
// execution. The InstructionResponse MUST have the matching id when
// responding to the runner.
string instruction_id = 1;
// If this is specified, then this instruction has failed.
// A human readable string representing the reason as to why processing has
// failed.
string error = 2;
// If the instruction did not fail, it is required to return an equivalent
// response type depending on the request this matches.
oneof response {
ProcessBundleResponse process_bundle = 1001;
ProcessBundleProgressResponse process_bundle_progress = 1002;
ProcessBundleSplitResponse process_bundle_split = 1003;
FinalizeBundleResponse finalize_bundle = 1004;
MonitoringInfosMetadataResponse monitoring_infos = 1005;
HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
// DEPRECATED
RegisterResponse register = 1000;
}
}
// A request to provide full MonitoringInfo associated with the entire SDK
// harness process, not specific to a bundle.
//
// An SDK can report metrics using an identifier that only contains the
// associated payload. A runner who wants to receive the full metrics
// information can request all the monitoring metadata via a
// MonitoringInfosMetadataRequest providing a list of ids as necessary.
//
// The SDK is allowed to reuse the identifiers
// for the lifetime of the associated control connection as long
// as the MonitoringInfo could be reconstructed fully by overwriting its
// payload field with the bytes specified here.
message HarnessMonitoringInfosRequest {
}
message HarnessMonitoringInfosResponse {
// An identifier to MonitoringInfo.payload mapping containing
// Metrics associated with the SDK harness, not a specific bundle.
//
// An SDK can report metrics using an identifier that only contains the
// associated payload. A runner who wants to receive the full metrics
// information can request all the monitoring metadata via a
// MonitoringInfosMetadataRequest providing a list of ids as necessary.
//
// The SDK is allowed to reuse the identifiers
// for the lifetime of the associated control connection as long
// as the MonitoringInfo could be reconstructed fully by overwriting its
// payload field with the bytes specified here.
map<string, bytes> monitoring_data = 1;
}
// A list of objects which can be referred to by the runner in
// future requests.
// Stable
message RegisterRequest {
// (Optional) The set of descriptors used to process bundles.
repeated ProcessBundleDescriptor process_bundle_descriptor = 1;
}
// Stable
message RegisterResponse {}
// Definitions that should be used to construct the bundle processing graph.
message ProcessBundleDescriptor {
// (Required) A pipeline level unique id which can be used as a reference to
// refer to this.
string id = 1;
// (Required) A map from pipeline-scoped id to PTransform.
map<string, org.apache.beam.model.pipeline.v1.PTransform> transforms = 2;
// (Required) A map from pipeline-scoped id to PCollection.
map<string, org.apache.beam.model.pipeline.v1.PCollection> pcollections = 3;
// (Required) A map from pipeline-scoped id to WindowingStrategy.
map<string, org.apache.beam.model.pipeline.v1.WindowingStrategy>
windowing_strategies = 4;
// (Required) A map from pipeline-scoped id to Coder.
map<string, org.apache.beam.model.pipeline.v1.Coder> coders = 5;
// (Required) A map from pipeline-scoped id to Environment.
map<string, org.apache.beam.model.pipeline.v1.Environment> environments = 6;
// A descriptor describing the end point to use for State API
// calls. Required if the Runner intends to send remote references over the
// data plane or if any of the transforms rely on user state or side inputs.
org.apache.beam.model.pipeline.v1.ApiServiceDescriptor
state_api_service_descriptor = 7;
// A descriptor describing the end point to use for Data API for user timers.
// Required if the ProcessBundleDescriptor contains any transforms that have user timers.
org.apache.beam.model.pipeline.v1.ApiServiceDescriptor
timer_api_service_descriptor = 8;
}
// One of the applications specifying the scope of work for a bundle.
// See
// https://docs.google.com/document/d/1tUDb45sStdR8u7-jBkGdw3OGFK7aa2-V7eo86zYSE_4/edit#heading=h.9g3g5weg2u9
// for further details.
message BundleApplication {
// (Required) The transform to which to pass the element
string transform_id = 1;
// (Required) Name of the transform's input to which to pass the element.
string input_id = 2;
// (Required) The encoded element to pass to the transform.
bytes element = 3;
// The map is keyed by the local output name of the PTransform. Each
// value represents a lower bound on the timestamps of elements that
// are produced by this PTransform into each of its output PCollections
// when invoked with this application.
//
// If there is no watermark reported from RestrictionTracker, the runner will
// use MIN_TIMESTAMP by default.
map<string, google.protobuf.Timestamp> output_watermarks = 4;
// Whether this application potentially produces an unbounded
// amount of data. Note that this should only be set to BOUNDED if and
// only if the application is known to produce a finite amount of output.
org.apache.beam.model.pipeline.v1.IsBounded.Enum is_bounded = 5;
}
// An Application should be scheduled for execution after a delay.
// Either an absolute timestamp or a relative timestamp can represent a
// scheduled execution time.
message DelayedBundleApplication {
// (Required) The application that should be scheduled.
BundleApplication application = 1;
// Recommended time delay at which the application should be scheduled to
// execute by the runner. Time delay that equals 0 may be scheduled to execute
// immediately. The unit of time delay should be microsecond.
google.protobuf.Duration requested_time_delay = 2;
}
// A request to process a given bundle.
// Stable
message ProcessBundleRequest {
// (Required) A reference to the process bundle descriptor that must be
// instantiated and executed by the SDK harness.
string process_bundle_descriptor_id = 1;
// A cache token which can be used by an SDK to check for the validity
// of cached elements which have a cache token associated.
message CacheToken {
// A flag to indicate a cache token is valid for all user state.
message UserState {}
// A flag to indicate a cache token is valid for a side input.
message SideInput {
// (Required) The id of the PTransform containing a side input.
string transform_id = 1;
// (Required) The id of the side input.
string side_input_id = 2;
}
// The scope of a cache token.
oneof type {
UserState user_state = 1;
SideInput side_input = 2;
}
// The cache token identifier which should be globally unique.
bytes token = 10;
}
// (Optional) A list of cache tokens that can be used by an SDK to reuse
// cached data returned by the State API across multiple bundles.
repeated CacheToken cache_tokens = 2;
}
message ProcessBundleResponse {
// (Optional) Specifies that the bundle has not been completed and the
// following applications need to be scheduled and executed in the future.
// A runner that does not yet support residual roots MUST still check that
// this is empty for correctness.
repeated DelayedBundleApplication residual_roots = 2;
// DEPRECATED (Required) The list of metrics or other MonitoredState
// collected while processing this bundle.
repeated org.apache.beam.model.pipeline.v1.MonitoringInfo monitoring_infos = 3;
// (Optional) Specifies that the runner must callback to this worker
// once the output of the bundle is committed. The Runner must send a
// FinalizeBundleRequest with the instruction id of the ProcessBundleRequest
// that is related to this ProcessBundleResponse.
bool requires_finalization = 4;
// An identifier to MonitoringInfo.payload mapping.
//
// An SDK can report metrics using an identifier that only contains the
// associated payload. A runner who wants to receive the full metrics
// information can request all the monitoring metadata via a
// MonitoringInfosMetadataRequest providing a list of ids as necessary.
//
// The SDK is allowed to reuse the identifiers across multiple bundles as long
// as the MonitoringInfo could be reconstructed fully by overwriting its
// payload field with the bytes specified here.
map<string, bytes> monitoring_data = 5;
reserved 1;
}
// A request to report progress information for a given bundle.
// This is an optional request to be handled and is used to support advanced
// SDK features such as SplittableDoFn, user level metrics etc.
message ProcessBundleProgressRequest {
// (Required) A reference to an active process bundle request with the given
// instruction id.
string instruction_id = 1;
}
// A request to provide full MonitoringInfo for a set of provided ids.
//
// An SDK can report metrics using an identifier that only contains the
// associated payload. A runner who wants to receive the full metrics
// information can request all the monitoring metadata via a
// MonitoringInfosMetadataRequest providing a list of ids as necessary.
//
// The SDK is allowed to reuse the identifiers for the lifetime of the
// associated control connection as long as the MonitoringInfo could be
// reconstructed fully by overwriting its payload field with the bytes specified
// here.
message MonitoringInfosMetadataRequest {
// A list of ids for which the full MonitoringInfo is requested for.
repeated string monitoring_info_id = 1;
}
message ProcessBundleProgressResponse {
// DEPRECATED (Required) The list of metrics or other MonitoredState
// collected while processing this bundle.
repeated org.apache.beam.model.pipeline.v1.MonitoringInfo monitoring_infos = 3;
// An identifier to MonitoringInfo.payload mapping.
//
// An SDK can report metrics using an identifier that only contains the
// associated payload. A runner who wants to receive the full metrics
// information can request all the monitoring metadata via a
// MonitoringInfosMetadataRequest providing a list of ids as necessary.
//
// The SDK is allowed to reuse the identifiers
// for the lifetime of the associated control connection as long
// as the MonitoringInfo could be reconstructed fully by overwriting its
// payload field with the bytes specified here.
map<string, bytes> monitoring_data = 5;
reserved 1, 2, 4;
}
// A response that contains the full mapping information associated with
// a specified set of identifiers.
//
// An SDK can report metrics using an identifier that only contains the
// associated payload. A runner who wants to receive the full metrics
// information can request all the monitoring metadata via a
// MonitoringInfosMetadataRequest providing a list of ids as necessary.
//
// The SDK is allowed to reuse the identifiers
// for the lifetime of the associated control connection as long
// as the MonitoringInfo could be reconstructed fully by overwriting its
// payload field with the bytes specified here.
message MonitoringInfosMetadataResponse {
// A mapping from an identifier to the full metrics information.
map<string, org.apache.beam.model.pipeline.v1.MonitoringInfo> monitoring_info = 1;
}
// Represents a request to the SDK to split a currently active bundle.
message ProcessBundleSplitRequest {
// (Required) A reference to an active process bundle request with the given
// instruction id.
string instruction_id = 1;
// A message specifying the desired split for a single transform.
message DesiredSplit {
// (Required) The fraction of known work remaining in this bundle
// for this transform that should be kept by the SDK after this split.
//
// Set to 0 to "checkpoint" as soon as possible (keeping as little work as
// possible and returning the remainder).
double fraction_of_remainder = 1;
// A set of allowed element indices where the SDK may split. When this is
// empty, there are no constraints on where to split.
repeated int64 allowed_split_points = 3;
// (Required for GrpcRead operations) Number of total elements expected
// to be sent to this GrpcRead operation, required to correctly account
// for unreceived data when determining where to split.
int64 estimated_input_elements = 2;
// TODO(SDF): Allow providing weights rather than sizes.
}
// (Required) Specifies the desired split for each transform.
//
// Currently only splits at GRPC read operations are supported.
// This may, of course, limit the amount of work downstream operations
// receive.
map<string, DesiredSplit> desired_splits = 3;
}
// Represents a partition of the bundle: a "primary" and
// a "residual", with the following properties:
// - The work in primary and residual doesn't overlap, and combined, adds up
// to the work in the current bundle if the split hadn't happened.
// - The current bundle, if it keeps executing, will have done none of the
// work under residual_roots.
// - The current bundle, if no further splits happen, will have done exactly
// the work under primary_roots.
// For more rigorous definitions see https://s.apache.org/beam-breaking-fusion
message ProcessBundleSplitResponse {
// Root applications that should replace the current bundle.
repeated BundleApplication primary_roots = 1;
// Root applications that have been removed from the current bundle and
// have to be executed in a separate bundle (e.g. in parallel on a different
// worker, or after the current bundle completes, etc.)
repeated DelayedBundleApplication residual_roots = 2;
// Represents contiguous portions of the data channel that are either
// entirely processed or entirely unprocessed and belong to the primary
// or residual respectively.
//
// This affords both a more efficient representation over the FnAPI
// (if the bundle is large) and often a more efficient representation
// on the runner side (e.g. if the set of elements can be represented
// as some range in an underlying dataset).
message ChannelSplit {
// (Required) The grpc read transform reading this channel.
string transform_id = 1;
// The last element of the input channel that should be entirely considered
// part of the primary, identified by its absolute index in the (ordered)
// channel.
int64 last_primary_element = 2;
// The first element of the input channel that should be entirely considered
// part of the residual, identified by its absolute index in the (ordered)
// channel.
int64 first_residual_element = 3;
}
// Partitions of input data channels into primary and residual elements,
// if any. Should not include any elements represented in the bundle
// applications roots above.
repeated ChannelSplit channel_splits = 3;
}
message FinalizeBundleRequest {
// (Required) A reference to a completed process bundle request with the given
// instruction id.
string instruction_id = 1;
}
message FinalizeBundleResponse {
// Empty
}
/*
* Data Plane API
*/
// Messages used to represent logical byte streams.
// Stable
message Elements {
// Represents multiple encoded elements in nested context for a given named
// instruction and transform.
message Data {
// (Required) A reference to an active instruction request with the given
// instruction id.
string instruction_id = 1;
// (Required) A definition representing a consumer or producer of this data.
// If received by a harness, this represents the consumer within that
// harness that should consume these bytes. If sent by a harness, this
// represents the producer of these bytes.
//
// Note that a single element may span multiple Data messages.
//
// Note that a sending/receiving pair should share the same identifier.
string transform_id = 2;
// (Optional) Represents a part of a logical byte stream. Elements within
// the logical byte stream are encoded in the nested context and
// concatenated together.
bytes data = 3;
// (Optional) Set this bit to indicate the this is the last data block
// for the given instruction and transform, ending the stream.
bool is_last = 4;
}
// Represent the encoded user timer for a given instruction, transform and
// timer id.
message Timers {
// (Required) A reference to an active instruction request with the given
// instruction id.
string instruction_id = 1;
// (Required) A definition representing a consumer or producer of this data.
// If received by a harness, this represents the consumer within that
// harness that should consume these timers. If sent by a harness, this
// represents the producer of these timers.
string transform_id = 2;
// (Required) The local timer family name used to identify the associated
// timer family specification
string timer_family_id = 3;
// (Optional) Represents a logical byte stream of timers. Encoded according
// to the coder in the timer spec.
bytes timers = 4;
// (Optional) Set this bit to indicate the this is the last data block
// for the given instruction and transform, ending the stream.
bool is_last = 5;
}
// (Optional) A list containing parts of logical byte streams.
repeated Data data = 1;
// (Optional) A list of timer byte streams.
repeated Timers timers = 2;
}
// Stable
service BeamFnData {
// Used to send data between harnesses.
rpc Data(
// A stream of data representing input.
stream Elements)
returns (
// A stream of data representing output.
stream Elements) {}
}
/*
* State API
*/
message StateRequest {
// (Required) A unique identifier provided by the SDK which represents this
// requests execution. The StateResponse corresponding with this request
// will have the matching id.
string id = 1;
// (Required) The associated instruction id of the work that is currently
// being processed. This allows for the runner to associate any modifications
// to state to be committed with the appropriate work execution.
string instruction_id = 2;
// (Required) The state key this request is for.
StateKey state_key = 3;
// (Required) The action to take on this request.
oneof request {
// A request to get state.
StateGetRequest get = 1000;
// A request to append to state.
StateAppendRequest append = 1001;
// A request to clear state.
StateClearRequest clear = 1002;
}
}
message StateResponse {
// (Required) A reference provided by the SDK which represents a requests
// execution. The StateResponse must have the matching id when responding
// to the SDK.
string id = 1;
// (Optional) If this is specified, then the state request has failed.
// A human readable string representing the reason as to why the request
// failed.
string error = 2;
// A corresponding response matching the request will be populated.
oneof response {
// A response to getting state.
StateGetResponse get = 1000;
// A response to appending to state.
StateAppendResponse append = 1001;
// A response to clearing state.
StateClearResponse clear = 1002;
}
}
service BeamFnState {
// Used to get/append/clear state stored by the runner on behalf of the SDK.
rpc State(
// A stream of state instructions requested of the runner.
stream StateRequest)
returns (
// A stream of responses to state instructions the runner was asked to
// be performed.
stream StateResponse) {}
}
message StateKey {
message Runner {
// (Required) Opaque information supplied by the runner. Used to support
// remote references.
// https://s.apache.org/beam-fn-api-send-and-receive-data
//
// Used by state backed iterable. And in this use case, request type can
// only be of type get. Details see:
// https://s.apache.org/beam-fn-api-state-backed-iterables
bytes key = 1;
}
// Represents a request for the values associated with a specified window
// in a PCollection. See
// https://s.apache.org/beam-fn-state-api-and-bundle-processing for further
// details.
//
// Can only be used to perform StateGetRequests on side inputs of the URN
// beam:side_input:iterable:v1.
//
// For a PCollection<V>, the response data stream will be a concatenation
// of all V's. See https://s.apache.org/beam-fn-api-send-and-receive-data
// for further details.
message IterableSideInput {
// (Required) The id of the PTransform containing a side input.
string transform_id = 1;
// (Required) The id of the side input.
string side_input_id = 2;
// (Required) The window (after mapping the currently executing elements
// window into the side input windows domain) encoded in a nested context.
bytes window = 3;
}
// Represents a request for the values associated with a specified user key
// and window in a PCollection. See
// https://s.apache.org/beam-fn-state-api-and-bundle-processing for further
// details.
//
// Can only be used to perform StateGetRequests on side inputs of the URN
// beam:side_input:multimap:v1.
//
// For a PCollection<KV<K, V>>, the response data stream will be a
// concatenation of all V's associated with the specified key K. See
// https://s.apache.org/beam-fn-api-send-and-receive-data for further
// details.
message MultimapSideInput {
// (Required) The id of the PTransform containing a side input.
string transform_id = 1;
// (Required) The id of the side input.
string side_input_id = 2;
// (Required) The window (after mapping the currently executing elements
// window into the side input windows domain) encoded in a nested context.
bytes window = 3;
// (Required) The key encoded in a nested context.
bytes key = 4;
}
// Represents a request for the keys associated with a specified window in a PCollection. See
// https://s.apache.org/beam-fn-state-api-and-bundle-processing for further
// details.
//
// Can only be used to perform StateGetRequests on side inputs of the URN
// beam:side_input:multimap:v1.
//
// For a PCollection<KV<K, V>>, the response data stream will be a
// concatenation of all K's associated with the specified window. See
// https://s.apache.org/beam-fn-api-send-and-receive-data for further
// details.
message MultimapKeysSideInput {
// (Required) The id of the PTransform containing a side input.
string transform_id = 1;
// (Required) The id of the side input.
string side_input_id = 2;
// (Required) The window (after mapping the currently executing elements
// window into the side input windows domain) encoded in a nested context.
bytes window = 3;
}
// Represents a request for an unordered set of values associated with a
// specified user key and window for a PTransform. See
// https://s.apache.org/beam-fn-state-api-and-bundle-processing for further
// details.
//
// The response data stream will be a concatenation of all V's associated
// with the specified user key and window.
// See https://s.apache.org/beam-fn-api-send-and-receive-data for further
// details.
message BagUserState {
// (Required) The id of the PTransform containing user state.
string transform_id = 1;
// (Required) The id of the user state.
string user_state_id = 2;
// (Required) The window encoded in a nested context.
bytes window = 3;
// (Required) The key of the currently executing element encoded in a
// nested context.
bytes key = 4;
}
// Represents a request for the keys of a multimap associated with a specified
// user key and window for a PTransform. See
// https://s.apache.org/beam-fn-state-api-and-bundle-processing for further
// details.
//
// Can only be used to perform StateGetRequests and StateClearRequests on the
// user state.
//
// The response data stream will be a concatenation of all K's associated
// with the specified user key and window.
// See https://s.apache.org/beam-fn-api-send-and-receive-data for further
// details.
message MultimapKeysUserState {
// (Required) The id of the PTransform containing user state.
string transform_id = 1;
// (Required) The id of the user state.
string user_state_id = 2;
// (Required) The window encoded in a nested context.
bytes window = 3;
// (Required) The key of the currently executing element encoded in a
// nested context.
bytes key = 4;
}
// Represents a request for the values of the map key associated with a
// specified user key and window for a PTransform. See
// https://s.apache.org/beam-fn-state-api-and-bundle-processing for further
// details.
//
// The response data stream will be a concatenation of all V's associated
// with the specified map key, user key, and window.
// See https://s.apache.org/beam-fn-api-send-and-receive-data for further
// details.
message MultimapUserState {
// (Required) The id of the PTransform containing user state.
string transform_id = 1;
// (Required) The id of the user state.
string user_state_id = 2;
// (Required) The window encoded in a nested context.
bytes window = 3;
// (Required) The key of the currently executing element encoded in a
// nested context.
bytes key = 4;
// (Required) The map key encoded in a nested context.
bytes map_key = 5;
}
// (Required) One of the following state keys must be set.
oneof type {
Runner runner = 1;
MultimapSideInput multimap_side_input = 2;
BagUserState bag_user_state = 3;
IterableSideInput iterable_side_input = 4;
MultimapKeysSideInput multimap_keys_side_input = 5;
MultimapKeysUserState multimap_keys_user_state = 6;
MultimapUserState multimap_user_state = 7;
}
}
// A request to get state.
message StateGetRequest {
// (Optional) If specified, signals to the runner that the response
// should resume from the following continuation token.
//
// If unspecified, signals to the runner that the response should start
// from the beginning of the logical continuable stream.
bytes continuation_token = 1;
}
// A response to get state representing a logical byte stream which can be
// continued using the state API.
message StateGetResponse {
// (Optional) If specified, represents a token which can be used with the
// state API to get the next chunk of this logical byte stream. The end of
// the logical byte stream is signalled by this field being unset.
bytes continuation_token = 1;
// Represents a part of a logical byte stream. Elements within
// the logical byte stream are encoded in the nested context and
// concatenated together.
bytes data = 2;
}
// A request to append state.
message StateAppendRequest {
// Represents a part of a logical byte stream. Elements within
// the logical byte stream are encoded in the nested context and
// multiple append requests are concatenated together.
bytes data = 1;
}
// A response to append state.
message StateAppendResponse {}
// A request to clear state.
message StateClearRequest {}
// A response to clear state.
message StateClearResponse {}
/*
* Logging API
*
* This is very stable. There can be some changes to how we define a LogEntry,
* to increase/decrease the severity types, the way we format an exception/stack
* trace, or the log site.
*/
// A log entry
message LogEntry {
// A list of log entries, enables buffering and batching of multiple
// log messages using the logging API.
message List {
// (Required) One or or more log messages.
repeated LogEntry log_entries = 1;
}
// The severity of the event described in a log entry, expressed as one of the
// severity levels listed below. For your reference, the levels are
// assigned the listed numeric values. The effect of using numeric values
// other than those listed is undefined.
//
// If you are writing log entries, you should map other severity encodings to
// one of these standard levels. For example, you might map all of
// Java's FINE, FINER, and FINEST levels to `Severity.DEBUG`.
//
// This list is intentionally not comprehensive; the intent is to provide a
// common set of "good enough" severity levels so that logging front ends
// can provide filtering and searching across log types. Users of the API are
// free not to use all severity levels in their log messages.
message Severity {
enum Enum {
// Unspecified level information. Will be logged at the TRACE level.
UNSPECIFIED = 0;
TRACE = 1;
// Debugging information.
DEBUG = 2;
// Normal events.
INFO = 3;
// Normal but significant events, such as start up, shut down, or
// configuration.
NOTICE = 4;
// Warning events might cause problems.
WARN = 5;
// Error events are likely to cause problems.
ERROR = 6;
// Critical events cause severe problems or brief outages and may
// indicate that a person must take action.
CRITICAL = 7;
}
}
// (Required) The severity of the log statement.
Severity.Enum severity = 1;
// (Required) The time at which this log statement occurred.
google.protobuf.Timestamp timestamp = 2;
// (Required) A human readable message.
string message = 3;
// (Optional) An optional trace of the functions involved. For example, in
// Java this can include multiple causes and multiple suppressed exceptions.
string trace = 4;
// (Optional) A reference to the instruction this log statement is associated
// with.
string instruction_id = 5;
// (Optional) A reference to the transform this log statement is
// associated with.
string transform_id = 6;
// (Optional) Human-readable name of the function or method being invoked,
// with optional context such as the class or package name. The format can
// vary by language. For example:
// qual.if.ied.Class.method (Java)
// dir/package.func (Go)
// module.function (Python)
// file.cc:382 (C++)
string log_location = 7;
// (Optional) The name of the thread this log statement is associated with.
string thread = 8;
}
message LogControl {}
// Stable
service BeamFnLogging {
// Allows for the SDK to emit log entries which the runner can
// associate with the active job.
rpc Logging(
// A stream of log entries batched into lists emitted by the SDK harness.
stream LogEntry.List)
returns (
// A stream of log control messages used to configure the SDK.
stream LogControl) {}
}
message StartWorkerRequest {
string worker_id = 1;
org.apache.beam.model.pipeline.v1.ApiServiceDescriptor control_endpoint = 2;
org.apache.beam.model.pipeline.v1.ApiServiceDescriptor logging_endpoint = 3;
org.apache.beam.model.pipeline.v1.ApiServiceDescriptor artifact_endpoint = 4;
org.apache.beam.model.pipeline.v1.ApiServiceDescriptor provision_endpoint = 5;
map<string, string> params = 10;
}
message StartWorkerResponse {
string error = 1;
}
message StopWorkerRequest {
string worker_id = 1;
}
message StopWorkerResponse {
string error = 1;
}
service BeamFnExternalWorkerPool {
// Start the SDK worker with the given ID.
rpc StartWorker (StartWorkerRequest) returns (StartWorkerResponse) {}
// Stop the SDK worker.
rpc StopWorker (StopWorkerRequest) returns (StopWorkerResponse) {}
}
// Request from runner to SDK Harness asking for its status. For more details see
// https://s.apache.org/beam-fn-api-harness-status
message WorkerStatusRequest {
// (Required) Unique ID identifying this request.
string id = 1;
}
// Response from SDK Harness to runner containing the debug related status info.
message WorkerStatusResponse {
// (Required) Unique ID from the original request.
string id = 1;
// (Optional) Error message if exception encountered generating the status response.
string error = 2;
// (Optional) Status debugging info reported by SDK harness worker. Content and
// format is not strongly enforced but should be print-friendly and
// appropriate as an HTTP response body for end user. For details of the preferred
// info to include in the message see
// https://s.apache.org/beam-fn-api-harness-status
string status_info = 3;
}
// API for SDKs to report debug-related statuses to runner during pipeline execution.
service BeamFnWorkerStatus {
rpc WorkerStatus (stream WorkerStatusResponse)
returns (stream WorkerStatusRequest) {}
}