blob: 0b179c4d3386dc118df5454a7232e78540ab6fde [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto2";
package windmill;
option java_package = "org.apache.beam.runners.dataflow.worker.windmill";
option java_outer_classname = "Windmill";
////////////////////////////////////////////////////////////////////////////////
// API Data types
message Message {
required int64 timestamp = 1 [default = -0x8000000000000000];
required bytes data = 2;
optional bytes metadata = 3;
}
message Timer {
required bytes tag = 1;
optional int64 timestamp = 2 [default = -0x8000000000000000];
enum Type {
WATERMARK = 0;
REALTIME = 1;
DEPENDENT_REALTIME = 2;
}
optional Type type = 3 [default = WATERMARK];
optional string state_family = 4;
optional int64 metadata_timestamp = 5 [default = 0x7fffffffffffffff];
optional bytes metadata_payload = 6;
}
message InputMessageBundle {
required string source_computation_id = 1;
repeated Message messages = 2;
}
message KeyedMessageBundle {
required bytes key = 1;
optional fixed64 sharding_key = 4;
repeated Message messages = 2;
repeated bytes messages_ids = 3;
repeated bytes message_offsets = 5;
}
message LatencyAttribution {
message ActiveLatencyBreakdown {
message Distribution {
optional int64 count = 1;
optional int64 sum = 2;
optional int64 min = 3;
optional int64 max = 4;
optional int64 mean = 5;
}
message ActiveElementMetadata {
optional int64 processing_time_millis = 1;
}
optional string user_step_name = 1;
optional Distribution processing_times_distribution = 2;
optional ActiveElementMetadata active_message_metadata = 3;
}
enum State {
UNKNOWN = 0;
QUEUED = 1;
ACTIVE = 2;
READING = 3;
COMMITTING = 4;
// State which starts with the Windmill Worker receiving the GetWorkRequest
// and ends with the Windmill Worker sending the GetWorkResponse to the
// Windmill Dispatcher.
GET_WORK_IN_WINDMILL_WORKER = 5;
// State which starts with the Windmill Worker sending the GetWorkResponse
// and ends with the Windmill Dispatcher receiving the GetWorkResponse.
GET_WORK_IN_TRANSIT_TO_DISPATCHER = 6;
// State which starts with the Windmill Dispatcher sending the
// GetWorkResponse and ends with the user worker receiving the
// GetWorkResponse.
GET_WORK_IN_TRANSIT_TO_USER_WORKER = 7;
}
optional State state = 1;
optional int64 total_duration_millis = 2;
repeated ActiveLatencyBreakdown active_latency_breakdown = 3;
}
message PerStepNamespaceMetrics {
// The namespace of these metrics on the user worker.
optional string metrics_namespace = 1;
// The original system name of the unfused step that these metrics are
// reported from.
optional string original_step_name = 2;
// Metrics that are recorded for this namespace and unfused step.
repeated MetricValue metric_values = 3;
}
message MetricValue {
optional string metric_name = 1;
map<string, string> metric_labels = 2;
oneof value {
int64 value_int64 = 3;
Histogram value_histogram = 4;
}
}
// google3/google/api/distribution.proto with the following limitations.
// Histogram does not support explicit buckets.
// Histogram only supports a specific type of exponential buckets.
message Histogram {
// Number of values recorded in this distribution.
optional int64 count = 1;
// The arithmetic mean of the values recorded in this distribution.
optional double mean = 2;
// The sum of squared deviations from the mean of the values recorded in this
// histogram. For values x_i this is:
// Sum[i=1..n]((x_i - mean)^2)
optional double sum_of_squared_deviations = 3;
// `BucketOptions` describes the bucket boundaries used in the histogram.
message BucketOptions {
// Linear buckets with the following boundaries for indicies in 0 to n-1.
// 0: (-inf, start)
// i in [1, n-2]: [start + (i-1)*width, start + (i)*width)
// n-1: [start + (n-1)*width, inf)
// The 0th and n-1th bucket are the underflow/overflow buckets respectively.
message Linear {
optional int32 number_of_buckets = 1;
optional double width = 2;
optional double start = 3;
}
// Exponential buckets where the growth factor between buckets is
// 2**(2**-scale). e.g. for 'scale=1' growth factor is 2**(2**(-1))=sqrt(2).
// Buckets with the following boundaries for indicies in 0 to n-1.
// 0th: (-inf, 0)
// 1st: [0, gf)
// i in [2, n-2]: [gf^(i-1), gf^i)
// n-1: [gf^(n-2), inf)
// The 0th and n-1th bucket are the underflow/overflow buckets respectively.
message Base2Exponent {
optional int32 number_of_buckets = 1;
optional int32 scale = 2;
}
oneof BucketType {
Linear linear = 1;
Base2Exponent exponential = 2;
}
}
optional BucketOptions bucket_options = 5;
repeated int64 bucket_counts = 6 [packed = true];
}
message GetWorkStreamTimingInfo {
enum Event {
UNKNOWN = 0;
// Work item creation started by the Windmill Worker.
GET_WORK_CREATION_START = 1;
// Work item creation finished by the Windmill Worker.
GET_WORK_CREATION_END = 2;
// The GetWorkResponse containing this work item is received by the Windmill
// Dispatcher.
GET_WORK_RECEIVED_BY_DISPATCHER = 3;
// The GetWorkResponse containing this work item is forwarded by the
// Windmill Dispatcher to the user worker.
GET_WORK_FORWARDED_BY_DISPATCHER = 4;
}
// Critical event of the work item processing.
optional Event event = 1;
// Timestamp of the event.
optional int64 timestamp_usec = 2;
}
message OutputMessageBundle {
optional string destination_computation_id = 1;
optional string destination_stream_id = 3;
repeated KeyedMessageBundle bundles = 2;
}
message PubSubMessageBundle {
required string topic = 1;
repeated Message messages = 2;
optional string timestamp_label = 3;
optional string id_label = 4;
// If set, then each message is interpreted as a PubsubMessage proto,
// containing a payload and attributes.
optional bool with_attributes = 5;
}
message TimerBundle {
repeated Timer timers = 1;
}
message Value {
required int64 timestamp = 1 [default = -0x8000000000000000];
required bytes data = 2;
}
message TagValue {
required bytes tag = 1;
optional Value value = 2;
optional string state_family = 3;
}
message TagValuePrefix {
optional bytes tag_prefix = 1;
optional string state_family = 2;
}
message TagValuePrefixRequest {
optional bytes tag_prefix = 1;
optional string state_family = 2;
// In request: A previously returned continuation_token from an earlier
// request. Indicates we wish to fetch the next page of values.
// In response: Copied from request.
optional bytes request_position = 3;
optional int64 fetch_max_bytes = 4 [default = 0x6400000];
}
message TagValuePrefixResponse {
optional bytes tag_prefix = 1;
optional string state_family = 2;
repeated TagValue tag_values = 3;
optional bytes continuation_position = 4;
optional bytes request_position = 5;
}
message TagBag {
optional bytes tag = 1;
// In request: All existing items in the list will be deleted. If new values
// are present they will be written.
optional bool delete_all = 2;
// In request: The given values will be written to the collection.
// In response: Values that are present in the collection.
repeated bytes values = 3;
optional string state_family = 4;
// In request: A previously returned continuation_position from an earlier
// read response. Indicates we wish to fetch the next page of values.
// In response: Copied from request.
optional int64 request_position = 7;
// In response only: Set when there are values after those returned above, but
// they were suppressed to respect the fetch_max_bytes limit. Subsequent
// requests should copy this to request_position to retrieve the next page of
// values.
optional int64 continuation_position = 5;
// In request: Limits the size of the fetched bag to this byte limit.
optional int64 fetch_max_bytes = 6 [default = 0x7fffffffffffffff];
}
// For a given sharding key and state family, a TagMultimap is a collection of
// `entry_name, bag of values` pairs, each pair is a TagMultimapEntry.
//
// The request_position, continuation_position and fetch_max_bytes fields in
// TagMultimapEntry are used for the pagination and byte limiting of individual
// entry fetch requests get(entry_name); while those fields in
// TagMultimapFetchRequest and TagMultimapFetchResponse are used for full
// multimap fetch requests entry_names() and entries().
// Do not set both in a TagMultimapFetchRequest at the same time.
message TagMultimapEntry {
optional bytes entry_name = 1;
// In update request: if true all values associated with this entry_name will
// be deleted. If new values are present they will be written.
optional bool delete_all = 2;
// In update request: The given values will be added to the collection and
// associated with entry_name.
// In fetch response: Values that are associated with this entry_name in the
// multimap.
repeated bytes values = 3;
// In fetch request: A previously returned continuation_position from an
// earlier read response. Indicates we wish to fetch the next page of values.
// If this is the first request, set to empty.
// In fetch response: copied from request.
optional int64 request_position = 4;
// In fetch response: Set when there are values after those returned above,
// but they were suppressed to respect the fetch_max_bytes limit. Subsequent
// requests should copy this to request_position to retrieve the next page of
// values.
optional int64 continuation_position = 5;
// In fetch request: Limits the size of the fetched values to this byte limit.
// A lower limit may be imposed by the service.
optional int64 fetch_max_bytes = 6 [default = 0x7fffffffffffffff];
}
message TagMultimapFetchRequest {
optional bytes tag = 1;
optional string state_family = 2;
// If true, values will be omitted in the response.
optional bool fetch_entry_names_only = 3;
// Limits the size of the fetched entries to this byte limit. A lower limit
// may be imposed by the service.
optional int64 fetch_max_bytes = 4 [default = 0x7fffffffffffffff];
// A previously returned continuation_position from an earlier fetch response.
// Indicates we wish to fetch the next page of entries. If this is the first
// request, set to empty.
optional bytes request_position = 5;
// Fetch the requested subset of entries only. Will fetch all entries if left
// empty. Entries in entries_to_fetch should only have the entry_name,
// request_position and fetch_max_bytes set.
repeated TagMultimapEntry entries_to_fetch = 6;
}
message TagMultimapFetchResponse {
optional bytes tag = 1;
optional string state_family = 2;
repeated TagMultimapEntry entries = 3;
// Will be set only if the entries_to_fetch in the request is empty when
// we want to fetch all entries.
optional bytes continuation_position = 4;
// Request position copied from request.
optional bytes request_position = 5;
}
message TagMultimapUpdateRequest {
optional bytes tag = 1;
optional string state_family = 2;
// All entries including the values in the multimap will be deleted.
optional bool delete_all = 3;
// If delete_all is true and updates are not empty, the multimap will first be
// cleared and then those updates will be applied.
repeated TagMultimapEntry updates = 4;
}
// A single entry in a sorted list
message SortedListEntry {
// The value payload.
optional bytes value = 1;
// The sort key. must be strictly smaller than than 0x7fffffffffffffff.
optional int64 sort_key = 2;
// A unique id for this element.The combination of the sort key and the id
// identify an element. Entries written with the same sort_key and id will
// overwrite the previous entry.
optional uint64 id = 3;
}
message SortedListRange {
// start and limit describe the range of SortedList entries
// to be fetched. The targeted range is [start, limit).
optional int64 start = 1 [default = -0x8000000000000000];
optional int64 limit = 2 [default = 0x7fffffffffffffff];
}
message TagSortedListFetchRequest {
optional bytes tag = 1;
optional string state_family = 2;
repeated SortedListRange fetch_ranges = 3;
// Sets a limit on the maximum response value bytes
optional int64 fetch_max_bytes = 5 [default = 0x7fffffffffffffff];
// If a previous TagSortedListFetchRequest returned a continuation_position,
// then passing that token into requestPosition will cause the subsequent
// request to continue the previous request.
optional bytes request_position = 6;
}
message TagSortedListFetchResponse {
optional bytes tag = 1;
optional string state_family = 2;
repeated SortedListEntry entries = 3;
optional bytes continuation_position = 4;
// Fetch ranges copied from request.
repeated SortedListRange fetch_ranges = 5;
// Request position copied from request.
optional bytes request_position = 6;
}
message TagSortedListUpdateRequest {
optional bytes tag = 1;
optional string state_family = 2;
// The list of inserts and deletes to be applied. Deletes always happen before
// inserts. Inserts with the same sort_key+id overwrite each other.
repeated TagSortedListDeleteRequest deletes = 3;
repeated TagSortedListInsertRequest inserts = 4;
}
message TagSortedListInsertRequest {
repeated SortedListEntry entries = 1;
}
message TagSortedListDeleteRequest {
optional SortedListRange range = 1;
}
message GlobalDataId {
required string tag = 1;
required bytes version = 2;
}
message GlobalData {
required GlobalDataId data_id = 1;
optional bool is_ready = 2;
optional bytes data = 3;
optional string state_family = 4;
}
message SourceState {
optional bytes state = 1;
repeated fixed64 finalize_ids = 2;
optional bool only_finalize = 3;
optional bytes offset_limit = 4;
}
message WatermarkHold {
required bytes tag = 1;
repeated int64 timestamps = 2 [packed = true];
optional bool reset = 3;
optional string state_family = 4;
}
// Proto describing a hot key detected on a given WorkItem.
message HotKeyInfo {
// The age of the hot key measured from when it was first detected.
optional int64 hot_key_age_usec = 1;
}
message WorkItem {
required bytes key = 1;
required fixed64 work_token = 2;
optional fixed64 sharding_key = 9;
optional fixed64 cache_token = 7;
repeated InputMessageBundle message_bundles = 3;
optional TimerBundle timers = 4;
repeated GlobalDataId global_data_id_notifications = 5;
optional SourceState source_state = 6;
optional int64 output_data_watermark = 8 [default = -0x8000000000000000];
// Indicates that this is a new key with no data associated. This allows
// the harness to optimize data fetching.
optional bool is_new_key = 10;
// A hot key is a symptom of poor data distribution in which there are enough
// elements mapped to a single key to impact pipeline performance. When
// present, this field includes metadata associated with any hot key.
optional HotKeyInfo hot_key_info = 11;
}
message ComputationWorkItems {
required string computation_id = 1;
repeated WorkItem work = 2;
optional int64 input_data_watermark = 3 [default = -0x8000000000000000];
optional int64 dependent_realtime_input_watermark = 4
[default = -0x8000000000000000];
optional bool drain_mode = 6;
}
////////////////////////////////////////////////////////////////////////////////
// API calls
// GetWork
message GetWorkRequest {
required fixed64 client_id = 1;
optional string worker_id = 4;
optional string job_id = 5;
optional string project_id = 7;
optional int64 max_items = 2 [default = 0xffffffff];
optional int64 max_bytes = 3 [default = 0x7fffffffffffffff];
reserved 6;
}
message GetWorkResponse {
repeated ComputationWorkItems work = 1;
}
// GetData
message KeyedGetDataRequest {
optional bytes key = 1;
required fixed64 work_token = 2;
optional fixed64 sharding_key = 6;
optional fixed64 cache_token = 11;
repeated TagValue values_to_fetch = 3;
repeated TagValuePrefixRequest tag_value_prefixes_to_fetch = 10;
repeated TagBag bags_to_fetch = 8;
// Must be at most one sorted_list_to_fetch for a given state family and tag.
repeated TagSortedListFetchRequest sorted_lists_to_fetch = 9;
// Must be at most one multimaps_to_fetch for a given state family and tag.
repeated TagMultimapFetchRequest multimaps_to_fetch = 12;
repeated WatermarkHold watermark_holds_to_fetch = 5;
repeated LatencyAttribution latency_attribution = 13;
optional int64 max_bytes = 7;
reserved 4;
}
message ComputationGetDataRequest {
required string computation_id = 1;
repeated KeyedGetDataRequest requests = 2;
}
message GetDataRequest {
optional string job_id = 4;
optional string project_id = 5;
repeated ComputationGetDataRequest requests = 1;
repeated GlobalDataRequest global_data_fetch_requests = 3;
// Assigned worker id for the instance.
optional string worker_id = 6;
// SE only. Will only be set by compatible client
repeated ComputationHeartbeatRequest computation_heartbeat_request = 7;
// DEPRECATED
repeated GlobalDataId global_data_to_fetch = 2;
}
message KeyedGetDataResponse {
required bytes key = 1;
optional fixed64 sharding_key = 7;
// The response for this key is not populated due to the fetch failing.
optional bool failed = 2;
repeated TagValue values = 3;
repeated TagValuePrefixResponse tag_value_prefixes = 9;
repeated TagBag bags = 6;
// There is one TagSortedListFetchResponse per state-family, tag pair.
repeated TagSortedListFetchResponse tag_sorted_lists = 8;
// There is one TagMultimapFetchResponse per state-family, tag pair.
repeated TagMultimapFetchResponse tag_multimaps = 10;
repeated WatermarkHold watermark_holds = 5;
reserved 4;
}
message ComputationGetDataResponse {
required string computation_id = 1;
repeated KeyedGetDataResponse data = 2;
}
message GetDataResponse {
repeated ComputationGetDataResponse data = 1;
repeated GlobalData global_data = 2;
// Only set if ComputationHeartbeatRequest was sent, prior versions do not
// expect a response for heartbeats. SE only.
repeated ComputationHeartbeatResponse computation_heartbeat_response = 3;
}
// Heartbeats
//
// Heartbeats are sent over the GetData stream in Streaming Engine and
// indicates the work item that the user worker has previously received from
// GetWork but not yet committed with CommitWork.
// Note that implicit heartbeats not expecting a response may be sent as
// special KeyedGetDataRequests see function KeyedGetDataRequestIsHeartbeat.
// SE only.
message HeartbeatRequest {
optional fixed64 sharding_key = 1;
optional fixed64 work_token = 2;
optional fixed64 cache_token = 3;
repeated LatencyAttribution latency_attribution = 4;
}
// Responses for heartbeat requests, indicating which work is no longer valid
// on the windmill worker and may be dropped/cancelled in the client.
// SE only.
message HeartbeatResponse {
optional fixed64 sharding_key = 1;
optional fixed64 work_token = 2;
optional fixed64 cache_token = 3;
optional bool failed = 4;
}
message ComputationHeartbeatRequest {
optional string computation_id = 1;
repeated HeartbeatRequest heartbeat_requests = 2;
}
message ComputationHeartbeatResponse {
optional string computation_id = 1;
repeated HeartbeatResponse heartbeat_responses = 2;
}
// CommitWork
message Counter {
optional string name = 1;
enum Kind {
SUM = 0;
MAX = 1;
MIN = 2;
MEAN = 3;
};
optional Kind kind = 2;
// For SUM, MAX, MIN, AND, OR, MEAN at most one of the following should be
// set. For MEAN it is the sum
optional double double_scalar = 3;
optional int64 int_scalar = 4;
// Only set for MEAN. Count of elements contributing to the sum.
optional int64 mean_count = 6;
// True if this metric is reported as the total cumulative aggregate
// value accumulated since the worker started working on this WorkItem.
// By default this is false, indicating that this metric is reported
// as a delta that is not associated with any WorkItem.
optional bool cumulative = 7;
}
message GlobalDataRequest {
required GlobalDataId data_id = 1;
optional int64 existence_watermark_deadline = 2 [default = 0x7FFFFFFFFFFFFFFF];
optional string state_family = 3;
// Computation Id for this GlobalDataRequest. Only set for heartbeats.
optional string computation_id = 4;
// Dataflow defined metrics keyed by metrics namespace and unfused step name.
// All unfused steps in this list belong to the fused stage that
// computation_id refers to. Only set for heartbeats.
repeated PerStepNamespaceMetrics per_step_namespace_metrics = 5;
}
// next id: 28
message WorkItemCommitRequest {
required bytes key = 1;
required fixed64 work_token = 2;
optional fixed64 sharding_key = 15;
optional fixed64 cache_token = 16;
optional bool exceeds_max_work_item_commit_bytes = 20;
optional int64 estimated_work_item_commit_bytes = 21;
repeated OutputMessageBundle output_messages = 3;
repeated PubSubMessageBundle pubsub_messages = 7;
repeated Timer output_timers = 4;
repeated TagValue value_updates = 5;
repeated TagValuePrefix tag_value_prefix_deletes = 25;
repeated TagBag bag_updates = 18;
repeated TagSortedListUpdateRequest sorted_list_updates = 24;
repeated TagMultimapUpdateRequest multimap_updates = 26;
repeated Counter counter_updates = 8;
repeated GlobalDataRequest global_data_requests = 11;
repeated GlobalData global_data_updates = 10;
optional SourceState source_state_updates = 12;
optional int64 source_watermark = 13 [default = -0x8000000000000000];
optional int64 source_backlog_bytes = 17 [default = -1];
optional int64 source_bytes_processed = 22;
repeated WatermarkHold watermark_holds = 14;
// Collected work item processing state durations.
repeated LatencyAttribution per_work_item_latency_attributions = 27;
// DEPRECATED
repeated GlobalDataId global_data_id_requests = 9;
reserved 6, 19, 23;
}
message ComputationCommitWorkRequest {
required string computation_id = 1;
repeated WorkItemCommitRequest requests = 2;
}
message CommitWorkRequest {
optional string job_id = 2;
optional string project_id = 3;
repeated ComputationCommitWorkRequest requests = 1;
}
// Validation status of applying a single WorkItem.
// Statuses earlier in this list supersede later ones.
enum CommitStatus {
// Default proto value; do not use.
DEFAULT = 0;
// Commit succeeded.
OK = 1;
// Some requested resource to modify was not found.
NOT_FOUND = 2;
// Some part of this request exceeded size limits.
TOO_LARGE = 3;
// Token for this commit is invalid.
INVALID_TOKEN = 4;
// This key and token is already committing.
ALREADY_IN_COMMIT = 5;
// The CommitWork request had validation errors.
VALIDATION_FAILED = 6;
// The request was aborted.
ABORTED = 7;
}
message CommitWorkResponse {}
// Configuration
message GetConfigRequest {
optional string job_id = 2;
repeated string computations = 1;
}
message ComputationConfig {
// Map from user name of stateful transforms in this stage to their state
// family.
message TransformUserNameToStateFamilyEntry {
optional string transform_user_name = 1;
optional string state_family = 2;
}
repeated TransformUserNameToStateFamilyEntry
transform_user_name_to_state_family = 1;
}
message GetConfigResponse {
repeated string cloud_works = 1;
message NameMapEntry {
optional string user_name = 1;
optional string system_name = 2;
}
// Map of user names to system names
repeated NameMapEntry name_map = 2;
message SystemNameToComputationIdMapEntry {
optional string system_name = 1;
optional string computation_id = 2;
}
repeated SystemNameToComputationIdMapEntry
system_name_to_computation_id_map = 3;
// Map of computation id to ComputationConfig.
message ComputationConfigMapEntry {
optional string computation_id = 1;
optional ComputationConfig computation_config = 2;
}
repeated ComputationConfigMapEntry computation_config_map = 4;
}
// Reporting
message Exception {
repeated string stack_frames = 1;
optional Exception cause = 2;
}
message ReportStatsRequest {
optional string job_id = 6;
optional string computation_id = 1;
optional bytes key = 2;
optional fixed64 work_token = 3;
optional fixed64 sharding_key = 7;
repeated Exception exceptions = 4;
repeated Counter counter_updates = 5;
}
message ReportStatsResponse {
optional bool failed = 1;
}
////////////////////////////////////////////////////////////////////////////////
// Streaming API
message StreamingGetWorkRequest {
oneof chunk_type {
GetWorkRequest request = 1;
// Initial message is GetWorkRequest with subsequent messages being extensions.
StreamingGetWorkRequestExtension request_extension = 2;
}
// Ignored after initial request.
optional bool supports_multiple_work_items_in_chunk = 5 [default = false];
}
message StreamingGetWorkRequestExtension {
optional int64 max_items = 1 [default = 0xffffffff];
optional int64 max_bytes = 2 [default = 0x7fffffffffffffff];
}
message StreamingGetWorkResponseChunk {
// Not necessary for routing, but allows for optimization that if this is
// unset it is the same as the previous chunk.
optional ComputationWorkItemMetadata computation_metadata = 1;
// The serialized bytes are a serialized WorkItem.
optional int64 remaining_bytes_for_work_item = 2;
repeated bytes serialized_work_item = 3;
// Indicates which response stream this message corresponds to. A sequence of
// responses for the same stream_id should be deserialized together. Responses
// from other stream_ids may be interleaved on the physical stream.
optional fixed64 stream_id = 4;
// Timing infos for the work item. Windmill Dispatcher and user worker should
// propagate critical event timings if the list is not empty.
repeated GetWorkStreamTimingInfo per_work_item_timing_infos = 8;
// reserved field 5
}
message ComputationWorkItemMetadata {
optional string computation_id = 1;
optional int64 input_data_watermark = 2 [default = -0x8000000000000000];
optional int64 dependent_realtime_input_watermark = 3
[default = -0x8000000000000000];
optional bool drain_mode = 5;
}
message StreamingGetDataRequest {
// The first request on the stream should consist solely of the header. All
// subsequent requests should not have the header set.
optional JobHeader header = 2;
// A request_id for each data request should be unique within a stream. All
// response items corresponding to a data request will be identified by the
// same request_id.
// request_id field enumerates global data requests before state requests.
// REQUIRES:
// request_id.size() = global_data_request.size() + state_request.size()
// for non-heartbeats
// request_id.empty() for heartbeats
repeated fixed64 request_id = 1;
repeated GlobalDataRequest global_data_request = 3;
repeated ComputationGetDataRequest state_request = 4;
// Will only be set by compatible client
repeated ComputationHeartbeatRequest computation_heartbeat_request = 5;
}
message StreamingGetDataResponse {
// Indicates which request this message corresponds to. A sequence of
// Responses from other requests may be interleaved on the physical stream.
repeated fixed64 request_id = 1;
// The serialized bytes are a one of serialized KeyedGetDataResponse or
// GlobalData.
repeated bytes serialized_response = 2;
// Remaining bytes field applies only to the last serialized_response
optional int64 remaining_bytes_for_response = 3;
// Only set if ComputationHeartbeatRequest was sent, prior versions do not
// expect a response for heartbeats.
repeated ComputationHeartbeatResponse computation_heartbeat_response = 5;
reserved 4;
}
message StreamingCommitWorkRequest {
// The first request on the stream should consist solely of the header. All
// subsequent requests should not have the header set.
optional JobHeader header = 1;
repeated StreamingCommitRequestChunk commit_chunk = 2;
}
message JobHeader {
optional string job_id = 1;
optional string project_id = 2;
// Worker id is meant for logging only. Do not rely on it for other decisions.
optional string worker_id = 3;
optional fixed64 client_id = 4;
optional string region_id = 5;
// Used by the user worker to communicate to a specific windmill worker. This
// is initially passed to the user worker via GetWorkerMetadata.
optional string backend_worker_token = 6;
}
message StreamingCommitRequestChunk {
// A request_id for each complete request (a set of chunks) should be unique
// within a stream. The response corresponding to a request will be identified
// by the same request_id.
optional fixed64 request_id = 1;
// Routing-level fields. Within a stream we can support the optimization that
// an unset field has the same value as the previous chunk. This provides
// similar benefit to our current batching without requiring separate batching
// from transport implementation.
optional string computation_id = 2;
optional fixed64 sharding_key = 3;
// Payload-level fields. The serialized bytes are a serialized
// WorkItemCommitRequest. The request is serialized to avoid parsing in the
// dispatcher and to allow for requests for which single fields exceed the
// byte limit we wish to enforce for message size. The remaining bytes field
// is used to indicate this continuation. Workers will perform a streaming
// parse of serialized bytes to generate the complete WorkItemCommitRequest
// before handing off to the WindmillHost for processing.
optional int64 remaining_bytes_for_work_item = 4;
optional bytes serialized_work_item_commit = 5;
}
message StreamingCommitResponse {
// Indicates which request this message corresponds to.
repeated fixed64 request_id = 1;
// Commit status for each request_id in the message. Indices must line up
// with the request_id field, but trailing OKs may be omitted.
repeated CommitStatus status = 2;
}
message WorkerMetadataRequest {
optional JobHeader header = 1;
}
// Converted into org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints
// used to connect to Streaming Engine.
message WorkerMetadataResponse {
// The metadata version increases with every modification. Within a single
// stream it will always be increasing. The version may be used across streams
// to ensure that the view of the metadata does not move backwards.
optional int64 metadata_version = 1;
// Endpoints that should be used for requesting work with GetWorkStream.
// Additional data for returned work should be fetched from the endpoint with
// GetDataStream. The work should be committed to the endpoint with
// CommitWorkStream. Each response on this stream replaces the previous, and
// connections to endpoints that are no longer present should be closed.
message Endpoint {
// IPv6 address of a streaming engine windmill worker.
optional string direct_endpoint = 1;
optional string backend_worker_token = 2;
optional int64 port = 3;
}
repeated Endpoint work_endpoints = 2;
// Maps from GlobalData tag to the endpoint that should be used for GetData
// calls to retrieve that global data.
map<string, Endpoint> global_data_endpoints = 3;
// Used to set gRPC authority.
optional string external_endpoint = 5;
enum EndpointType {
UNKNOWN = 0;
CLOUDPATH = 1;
BORG = 2;
DIRECTPATH = 3;
}
optional EndpointType endpoint_type = 6 [default = CLOUDPATH];
reserved 4;
}
// Client-side settings for gRPC flow control set on the user worker when
// constructing the channels and stubs.
message UserWorkerGrpcFlowControlSettings {
// If true, the user worker will use gRPCs automatic flow control for
// windmill RPCs.
optional bool enable_auto_flow_control = 1 [default = false];
// The flow control window size for windmill RPCs. If
// enable_auto_flow_control is true, this is the initial window size and may
// be resized by the gRPC framework. Default and minimum is 10MiB.
optional int32 flow_control_window_bytes = 2 [default = 10485760];
// Specifies how many bytes must be queued before the call is considered not
// ready to send more messages.
optional int32 on_ready_threshold_bytes = 3;
}
enum ConnectivityType {
CONNECTIVITY_TYPE_DEFAULT = 0;
CONNECTIVITY_TYPE_CLOUDPATH = 1;
CONNECTIVITY_TYPE_DIRECTPATH = 2;
}
// Settings to control runtime behavior of the java runner v1 user worker.
message UserWorkerRunnerV1Settings {
optional UserWorkerGrpcFlowControlSettings flow_control_settings = 3;
optional ConnectivityType connectivity_type = 4
[default = CONNECTIVITY_TYPE_DEFAULT];
reserved 1, 2;
}
service WindmillAppliance {
// Gets streaming Dataflow work.
rpc GetWork(.windmill.GetWorkRequest) returns (.windmill.GetWorkResponse);
// Gets data from Windmill.
rpc GetData(.windmill.GetDataRequest) returns (.windmill.GetDataResponse);
// Commits previously acquired work.
rpc CommitWork(.windmill.CommitWorkRequest)
returns (.windmill.CommitWorkResponse);
// Gets dependant configuration from windmill.
rpc GetConfig(.windmill.GetConfigRequest)
returns (.windmill.GetConfigResponse);
// Reports stats to windmill.
rpc ReportStats(.windmill.ReportStatsRequest)
returns (.windmill.ReportStatsResponse);
}