| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| syntax = "proto2"; |
| |
| package windmill; |
| |
| option java_package = "org.apache.beam.runners.dataflow.worker.windmill"; |
| option java_outer_classname = "Windmill"; |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // API Data types |
| |
| message Message { |
| required int64 timestamp = 1 [default = -0x8000000000000000]; |
| required bytes data = 2; |
| optional bytes metadata = 3; |
| } |
| |
| message Timer { |
| required bytes tag = 1; |
| optional int64 timestamp = 2 [default = -0x8000000000000000]; |
| enum Type { |
| WATERMARK = 0; |
| REALTIME = 1; |
| DEPENDENT_REALTIME = 2; |
| } |
| optional Type type = 3 [default = WATERMARK]; |
| optional string state_family = 4; |
| optional int64 metadata_timestamp = 5 [default = 0x7fffffffffffffff]; |
| optional bytes metadata_payload = 6; |
| } |
| |
| message InputMessageBundle { |
| required string source_computation_id = 1; |
| repeated Message messages = 2; |
| } |
| |
| message KeyedMessageBundle { |
| required bytes key = 1; |
| optional fixed64 sharding_key = 4; |
| repeated Message messages = 2; |
| repeated bytes messages_ids = 3; |
| repeated bytes message_offsets = 5; |
| } |
| |
| message LatencyAttribution { |
| message ActiveLatencyBreakdown { |
| message Distribution { |
| optional int64 count = 1; |
| optional int64 sum = 2; |
| optional int64 min = 3; |
| optional int64 max = 4; |
| optional int64 mean = 5; |
| } |
| message ActiveElementMetadata { |
| optional int64 processing_time_millis = 1; |
| } |
| optional string user_step_name = 1; |
| optional Distribution processing_times_distribution = 2; |
| optional ActiveElementMetadata active_message_metadata = 3; |
| } |
| enum State { |
| UNKNOWN = 0; |
| QUEUED = 1; |
| ACTIVE = 2; |
| READING = 3; |
| COMMITTING = 4; |
| // State which starts with the Windmill Worker receiving the GetWorkRequest |
| // and ends with the Windmill Worker sending the GetWorkResponse to the |
| // Windmill Dispatcher. |
| GET_WORK_IN_WINDMILL_WORKER = 5; |
| // State which starts with the Windmill Worker sending the GetWorkResponse |
| // and ends with the Windmill Dispatcher receiving the GetWorkResponse. |
| GET_WORK_IN_TRANSIT_TO_DISPATCHER = 6; |
| // State which starts with the Windmill Dispatcher sending the |
| // GetWorkResponse and ends with the user worker receiving the |
| // GetWorkResponse. |
| GET_WORK_IN_TRANSIT_TO_USER_WORKER = 7; |
| } |
| optional State state = 1; |
| optional int64 total_duration_millis = 2; |
| repeated ActiveLatencyBreakdown active_latency_breakdown = 3; |
| } |
| |
| message PerStepNamespaceMetrics { |
| // The namespace of these metrics on the user worker. |
| optional string metrics_namespace = 1; |
| // The original system name of the unfused step that these metrics are |
| // reported from. |
| optional string original_step_name = 2; |
| // Metrics that are recorded for this namespace and unfused step. |
| repeated MetricValue metric_values = 3; |
| } |
| |
| message MetricValue { |
| optional string metric_name = 1; |
| map<string, string> metric_labels = 2; |
| oneof value { |
| int64 value_int64 = 3; |
| Histogram value_histogram = 4; |
| } |
| } |
| |
| // google3/google/api/distribution.proto with the following limitations. |
| // Histogram does not support explicit buckets. |
| // Histogram only supports a specific type of exponential buckets. |
| message Histogram { |
| // Number of values recorded in this distribution. |
| optional int64 count = 1; |
| // The arithmetic mean of the values recorded in this distribution. |
| optional double mean = 2; |
| // The sum of squared deviations from the mean of the values recorded in this |
| // histogram. For values x_i this is: |
| // Sum[i=1..n]((x_i - mean)^2) |
| optional double sum_of_squared_deviations = 3; |
| |
| // `BucketOptions` describes the bucket boundaries used in the histogram. |
| message BucketOptions { |
| // Linear buckets with the following boundaries for indicies in 0 to n-1. |
| // 0: (-inf, start) |
| // i in [1, n-2]: [start + (i-1)*width, start + (i)*width) |
| // n-1: [start + (n-1)*width, inf) |
| // The 0th and n-1th bucket are the underflow/overflow buckets respectively. |
| message Linear { |
| optional int32 number_of_buckets = 1; |
| optional double width = 2; |
| optional double start = 3; |
| } |
| |
| // Exponential buckets where the growth factor between buckets is |
| // 2**(2**-scale). e.g. for 'scale=1' growth factor is 2**(2**(-1))=sqrt(2). |
| // Buckets with the following boundaries for indicies in 0 to n-1. |
| // 0th: (-inf, 0) |
| // 1st: [0, gf) |
| // i in [2, n-2]: [gf^(i-1), gf^i) |
| // n-1: [gf^(n-2), inf) |
| // The 0th and n-1th bucket are the underflow/overflow buckets respectively. |
| message Base2Exponent { |
| optional int32 number_of_buckets = 1; |
| optional int32 scale = 2; |
| } |
| oneof BucketType { |
| Linear linear = 1; |
| Base2Exponent exponential = 2; |
| } |
| } |
| optional BucketOptions bucket_options = 5; |
| repeated int64 bucket_counts = 6 [packed = true]; |
| } |
| |
| message GetWorkStreamTimingInfo { |
| enum Event { |
| UNKNOWN = 0; |
| // Work item creation started by the Windmill Worker. |
| GET_WORK_CREATION_START = 1; |
| // Work item creation finished by the Windmill Worker. |
| GET_WORK_CREATION_END = 2; |
| // The GetWorkResponse containing this work item is received by the Windmill |
| // Dispatcher. |
| GET_WORK_RECEIVED_BY_DISPATCHER = 3; |
| // The GetWorkResponse containing this work item is forwarded by the |
| // Windmill Dispatcher to the user worker. |
| GET_WORK_FORWARDED_BY_DISPATCHER = 4; |
| } |
| |
| // Critical event of the work item processing. |
| optional Event event = 1; |
| |
| // Timestamp of the event. |
| optional int64 timestamp_usec = 2; |
| } |
| |
| message OutputMessageBundle { |
| optional string destination_computation_id = 1; |
| optional string destination_stream_id = 3; |
| repeated KeyedMessageBundle bundles = 2; |
| } |
| |
| message PubSubMessageBundle { |
| required string topic = 1; |
| repeated Message messages = 2; |
| optional string timestamp_label = 3; |
| optional string id_label = 4; |
| // If set, then each message is interpreted as a PubsubMessage proto, |
| // containing a payload and attributes. |
| optional bool with_attributes = 5; |
| } |
| |
| message TimerBundle { |
| repeated Timer timers = 1; |
| } |
| |
| message Value { |
| required int64 timestamp = 1 [default = -0x8000000000000000]; |
| required bytes data = 2; |
| } |
| |
| message TagValue { |
| required bytes tag = 1; |
| optional Value value = 2; |
| optional string state_family = 3; |
| } |
| |
| message TagValuePrefix { |
| optional bytes tag_prefix = 1; |
| optional string state_family = 2; |
| } |
| |
| message TagValuePrefixRequest { |
| optional bytes tag_prefix = 1; |
| optional string state_family = 2; |
| // In request: A previously returned continuation_token from an earlier |
| // request. Indicates we wish to fetch the next page of values. |
| // In response: Copied from request. |
| optional bytes request_position = 3; |
| optional int64 fetch_max_bytes = 4 [default = 0x6400000]; |
| } |
| |
| message TagValuePrefixResponse { |
| optional bytes tag_prefix = 1; |
| optional string state_family = 2; |
| repeated TagValue tag_values = 3; |
| optional bytes continuation_position = 4; |
| optional bytes request_position = 5; |
| } |
| |
| message TagBag { |
| optional bytes tag = 1; |
| // In request: All existing items in the list will be deleted. If new values |
| // are present they will be written. |
| optional bool delete_all = 2; |
| // In request: The given values will be written to the collection. |
| // In response: Values that are present in the collection. |
| repeated bytes values = 3; |
| optional string state_family = 4; |
| // In request: A previously returned continuation_position from an earlier |
| // read response. Indicates we wish to fetch the next page of values. |
| // In response: Copied from request. |
| optional int64 request_position = 7; |
| // In response only: Set when there are values after those returned above, but |
| // they were suppressed to respect the fetch_max_bytes limit. Subsequent |
| // requests should copy this to request_position to retrieve the next page of |
| // values. |
| optional int64 continuation_position = 5; |
| // In request: Limits the size of the fetched bag to this byte limit. |
| optional int64 fetch_max_bytes = 6 [default = 0x7fffffffffffffff]; |
| } |
| |
| // For a given sharding key and state family, a TagMultimap is a collection of |
| // `entry_name, bag of values` pairs, each pair is a TagMultimapEntry. |
| // |
| // The request_position, continuation_position and fetch_max_bytes fields in |
| // TagMultimapEntry are used for the pagination and byte limiting of individual |
| // entry fetch requests get(entry_name); while those fields in |
| // TagMultimapFetchRequest and TagMultimapFetchResponse are used for full |
| // multimap fetch requests entry_names() and entries(). |
| // Do not set both in a TagMultimapFetchRequest at the same time. |
| message TagMultimapEntry { |
| optional bytes entry_name = 1; |
| // In update request: if true all values associated with this entry_name will |
| // be deleted. If new values are present they will be written. |
| optional bool delete_all = 2; |
| // In update request: The given values will be added to the collection and |
| // associated with entry_name. |
| // In fetch response: Values that are associated with this entry_name in the |
| // multimap. |
| repeated bytes values = 3; |
| // In fetch request: A previously returned continuation_position from an |
| // earlier read response. Indicates we wish to fetch the next page of values. |
| // If this is the first request, set to empty. |
| // In fetch response: copied from request. |
| optional int64 request_position = 4; |
| // In fetch response: Set when there are values after those returned above, |
| // but they were suppressed to respect the fetch_max_bytes limit. Subsequent |
| // requests should copy this to request_position to retrieve the next page of |
| // values. |
| optional int64 continuation_position = 5; |
| // In fetch request: Limits the size of the fetched values to this byte limit. |
| // A lower limit may be imposed by the service. |
| optional int64 fetch_max_bytes = 6 [default = 0x7fffffffffffffff]; |
| } |
| |
| message TagMultimapFetchRequest { |
| optional bytes tag = 1; |
| optional string state_family = 2; |
| // If true, values will be omitted in the response. |
| optional bool fetch_entry_names_only = 3; |
| // Limits the size of the fetched entries to this byte limit. A lower limit |
| // may be imposed by the service. |
| optional int64 fetch_max_bytes = 4 [default = 0x7fffffffffffffff]; |
| // A previously returned continuation_position from an earlier fetch response. |
| // Indicates we wish to fetch the next page of entries. If this is the first |
| // request, set to empty. |
| optional bytes request_position = 5; |
| // Fetch the requested subset of entries only. Will fetch all entries if left |
| // empty. Entries in entries_to_fetch should only have the entry_name, |
| // request_position and fetch_max_bytes set. |
| repeated TagMultimapEntry entries_to_fetch = 6; |
| } |
| |
| message TagMultimapFetchResponse { |
| optional bytes tag = 1; |
| optional string state_family = 2; |
| repeated TagMultimapEntry entries = 3; |
| // Will be set only if the entries_to_fetch in the request is empty when |
| // we want to fetch all entries. |
| optional bytes continuation_position = 4; |
| // Request position copied from request. |
| optional bytes request_position = 5; |
| } |
| |
| message TagMultimapUpdateRequest { |
| optional bytes tag = 1; |
| optional string state_family = 2; |
| // All entries including the values in the multimap will be deleted. |
| optional bool delete_all = 3; |
| // If delete_all is true and updates are not empty, the multimap will first be |
| // cleared and then those updates will be applied. |
| repeated TagMultimapEntry updates = 4; |
| } |
| |
| // A single entry in a sorted list |
| message SortedListEntry { |
| // The value payload. |
| optional bytes value = 1; |
| // The sort key. must be strictly smaller than than 0x7fffffffffffffff. |
| optional int64 sort_key = 2; |
| // A unique id for this element.The combination of the sort key and the id |
| // identify an element. Entries written with the same sort_key and id will |
| // overwrite the previous entry. |
| optional uint64 id = 3; |
| } |
| |
| message SortedListRange { |
| // start and limit describe the range of SortedList entries |
| // to be fetched. The targeted range is [start, limit). |
| optional int64 start = 1 [default = -0x8000000000000000]; |
| optional int64 limit = 2 [default = 0x7fffffffffffffff]; |
| } |
| |
| message TagSortedListFetchRequest { |
| optional bytes tag = 1; |
| optional string state_family = 2; |
| repeated SortedListRange fetch_ranges = 3; |
| |
| // Sets a limit on the maximum response value bytes |
| optional int64 fetch_max_bytes = 5 [default = 0x7fffffffffffffff]; |
| // If a previous TagSortedListFetchRequest returned a continuation_position, |
| // then passing that token into requestPosition will cause the subsequent |
| // request to continue the previous request. |
| optional bytes request_position = 6; |
| } |
| |
| message TagSortedListFetchResponse { |
| optional bytes tag = 1; |
| optional string state_family = 2; |
| repeated SortedListEntry entries = 3; |
| optional bytes continuation_position = 4; |
| // Fetch ranges copied from request. |
| repeated SortedListRange fetch_ranges = 5; |
| // Request position copied from request. |
| optional bytes request_position = 6; |
| } |
| |
| message TagSortedListUpdateRequest { |
| optional bytes tag = 1; |
| optional string state_family = 2; |
| |
| // The list of inserts and deletes to be applied. Deletes always happen before |
| // inserts. Inserts with the same sort_key+id overwrite each other. |
| repeated TagSortedListDeleteRequest deletes = 3; |
| repeated TagSortedListInsertRequest inserts = 4; |
| } |
| |
| message TagSortedListInsertRequest { |
| repeated SortedListEntry entries = 1; |
| } |
| |
| message TagSortedListDeleteRequest { |
| optional SortedListRange range = 1; |
| } |
| message GlobalDataId { |
| required string tag = 1; |
| required bytes version = 2; |
| } |
| |
| message GlobalData { |
| required GlobalDataId data_id = 1; |
| optional bool is_ready = 2; |
| optional bytes data = 3; |
| optional string state_family = 4; |
| } |
| |
| message SourceState { |
| optional bytes state = 1; |
| repeated fixed64 finalize_ids = 2; |
| optional bool only_finalize = 3; |
| optional bytes offset_limit = 4; |
| } |
| |
| message WatermarkHold { |
| required bytes tag = 1; |
| repeated int64 timestamps = 2 [packed = true]; |
| optional bool reset = 3; |
| optional string state_family = 4; |
| } |
| |
| // Proto describing a hot key detected on a given WorkItem. |
| message HotKeyInfo { |
| // The age of the hot key measured from when it was first detected. |
| optional int64 hot_key_age_usec = 1; |
| } |
| |
| message WorkItem { |
| required bytes key = 1; |
| required fixed64 work_token = 2; |
| optional fixed64 sharding_key = 9; |
| |
| optional fixed64 cache_token = 7; |
| |
| repeated InputMessageBundle message_bundles = 3; |
| optional TimerBundle timers = 4; |
| repeated GlobalDataId global_data_id_notifications = 5; |
| optional SourceState source_state = 6; |
| optional int64 output_data_watermark = 8 [default = -0x8000000000000000]; |
| // Indicates that this is a new key with no data associated. This allows |
| // the harness to optimize data fetching. |
| optional bool is_new_key = 10; |
| |
| // A hot key is a symptom of poor data distribution in which there are enough |
| // elements mapped to a single key to impact pipeline performance. When |
| // present, this field includes metadata associated with any hot key. |
| optional HotKeyInfo hot_key_info = 11; |
| } |
| |
| message ComputationWorkItems { |
| required string computation_id = 1; |
| repeated WorkItem work = 2; |
| optional int64 input_data_watermark = 3 [default = -0x8000000000000000]; |
| optional int64 dependent_realtime_input_watermark = 4 |
| [default = -0x8000000000000000]; |
| optional bool drain_mode = 6; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // API calls |
| |
| // GetWork |
| |
| message GetWorkRequest { |
| required fixed64 client_id = 1; |
| optional string worker_id = 4; |
| optional string job_id = 5; |
| optional string project_id = 7; |
| optional int64 max_items = 2 [default = 0xffffffff]; |
| optional int64 max_bytes = 3 [default = 0x7fffffffffffffff]; |
| reserved 6; |
| } |
| |
| message GetWorkResponse { |
| repeated ComputationWorkItems work = 1; |
| } |
| |
| // GetData |
| |
| message KeyedGetDataRequest { |
| optional bytes key = 1; |
| required fixed64 work_token = 2; |
| optional fixed64 sharding_key = 6; |
| optional fixed64 cache_token = 11; |
| repeated TagValue values_to_fetch = 3; |
| repeated TagValuePrefixRequest tag_value_prefixes_to_fetch = 10; |
| repeated TagBag bags_to_fetch = 8; |
| // Must be at most one sorted_list_to_fetch for a given state family and tag. |
| repeated TagSortedListFetchRequest sorted_lists_to_fetch = 9; |
| // Must be at most one multimaps_to_fetch for a given state family and tag. |
| repeated TagMultimapFetchRequest multimaps_to_fetch = 12; |
| repeated WatermarkHold watermark_holds_to_fetch = 5; |
| repeated LatencyAttribution latency_attribution = 13; |
| |
| optional int64 max_bytes = 7; |
| reserved 4; |
| } |
| |
| message ComputationGetDataRequest { |
| required string computation_id = 1; |
| repeated KeyedGetDataRequest requests = 2; |
| } |
| |
| message GetDataRequest { |
| optional string job_id = 4; |
| optional string project_id = 5; |
| repeated ComputationGetDataRequest requests = 1; |
| repeated GlobalDataRequest global_data_fetch_requests = 3; |
| // Assigned worker id for the instance. |
| optional string worker_id = 6; |
| |
| // SE only. Will only be set by compatible client |
| repeated ComputationHeartbeatRequest computation_heartbeat_request = 7; |
| // DEPRECATED |
| repeated GlobalDataId global_data_to_fetch = 2; |
| } |
| |
| message KeyedGetDataResponse { |
| required bytes key = 1; |
| optional fixed64 sharding_key = 7; |
| // The response for this key is not populated due to the fetch failing. |
| optional bool failed = 2; |
| repeated TagValue values = 3; |
| repeated TagValuePrefixResponse tag_value_prefixes = 9; |
| repeated TagBag bags = 6; |
| // There is one TagSortedListFetchResponse per state-family, tag pair. |
| repeated TagSortedListFetchResponse tag_sorted_lists = 8; |
| // There is one TagMultimapFetchResponse per state-family, tag pair. |
| repeated TagMultimapFetchResponse tag_multimaps = 10; |
| repeated WatermarkHold watermark_holds = 5; |
| |
| reserved 4; |
| } |
| |
| message ComputationGetDataResponse { |
| required string computation_id = 1; |
| repeated KeyedGetDataResponse data = 2; |
| } |
| |
| message GetDataResponse { |
| repeated ComputationGetDataResponse data = 1; |
| repeated GlobalData global_data = 2; |
| // Only set if ComputationHeartbeatRequest was sent, prior versions do not |
| // expect a response for heartbeats. SE only. |
| repeated ComputationHeartbeatResponse computation_heartbeat_response = 3; |
| } |
| |
| // Heartbeats |
| // |
| // Heartbeats are sent over the GetData stream in Streaming Engine and |
| // indicates the work item that the user worker has previously received from |
| // GetWork but not yet committed with CommitWork. |
| // Note that implicit heartbeats not expecting a response may be sent as |
| // special KeyedGetDataRequests see function KeyedGetDataRequestIsHeartbeat. |
| // SE only. |
| message HeartbeatRequest { |
| optional fixed64 sharding_key = 1; |
| optional fixed64 work_token = 2; |
| optional fixed64 cache_token = 3; |
| repeated LatencyAttribution latency_attribution = 4; |
| } |
| |
| // Responses for heartbeat requests, indicating which work is no longer valid |
| // on the windmill worker and may be dropped/cancelled in the client. |
| // SE only. |
| message HeartbeatResponse { |
| optional fixed64 sharding_key = 1; |
| optional fixed64 work_token = 2; |
| optional fixed64 cache_token = 3; |
| optional bool failed = 4; |
| } |
| |
| message ComputationHeartbeatRequest { |
| optional string computation_id = 1; |
| repeated HeartbeatRequest heartbeat_requests = 2; |
| } |
| |
| message ComputationHeartbeatResponse { |
| optional string computation_id = 1; |
| repeated HeartbeatResponse heartbeat_responses = 2; |
| } |
| |
| // CommitWork |
| |
| message Counter { |
| optional string name = 1; |
| enum Kind { |
| SUM = 0; |
| MAX = 1; |
| MIN = 2; |
| MEAN = 3; |
| }; |
| optional Kind kind = 2; |
| |
| // For SUM, MAX, MIN, AND, OR, MEAN at most one of the following should be |
| // set. For MEAN it is the sum |
| optional double double_scalar = 3; |
| optional int64 int_scalar = 4; |
| |
| // Only set for MEAN. Count of elements contributing to the sum. |
| optional int64 mean_count = 6; |
| |
| // True if this metric is reported as the total cumulative aggregate |
| // value accumulated since the worker started working on this WorkItem. |
| // By default this is false, indicating that this metric is reported |
| // as a delta that is not associated with any WorkItem. |
| optional bool cumulative = 7; |
| } |
| |
| message GlobalDataRequest { |
| required GlobalDataId data_id = 1; |
| optional int64 existence_watermark_deadline = 2 [default = 0x7FFFFFFFFFFFFFFF]; |
| optional string state_family = 3; |
| |
| // Computation Id for this GlobalDataRequest. Only set for heartbeats. |
| optional string computation_id = 4; |
| // Dataflow defined metrics keyed by metrics namespace and unfused step name. |
| // All unfused steps in this list belong to the fused stage that |
| // computation_id refers to. Only set for heartbeats. |
| repeated PerStepNamespaceMetrics per_step_namespace_metrics = 5; |
| } |
| |
| // next id: 28 |
| message WorkItemCommitRequest { |
| required bytes key = 1; |
| required fixed64 work_token = 2; |
| optional fixed64 sharding_key = 15; |
| optional fixed64 cache_token = 16; |
| |
| optional bool exceeds_max_work_item_commit_bytes = 20; |
| optional int64 estimated_work_item_commit_bytes = 21; |
| |
| repeated OutputMessageBundle output_messages = 3; |
| repeated PubSubMessageBundle pubsub_messages = 7; |
| repeated Timer output_timers = 4; |
| repeated TagValue value_updates = 5; |
| repeated TagValuePrefix tag_value_prefix_deletes = 25; |
| repeated TagBag bag_updates = 18; |
| repeated TagSortedListUpdateRequest sorted_list_updates = 24; |
| repeated TagMultimapUpdateRequest multimap_updates = 26; |
| repeated Counter counter_updates = 8; |
| repeated GlobalDataRequest global_data_requests = 11; |
| repeated GlobalData global_data_updates = 10; |
| optional SourceState source_state_updates = 12; |
| optional int64 source_watermark = 13 [default = -0x8000000000000000]; |
| optional int64 source_backlog_bytes = 17 [default = -1]; |
| optional int64 source_bytes_processed = 22; |
| |
| repeated WatermarkHold watermark_holds = 14; |
| |
| // Collected work item processing state durations. |
| repeated LatencyAttribution per_work_item_latency_attributions = 27; |
| |
| // DEPRECATED |
| repeated GlobalDataId global_data_id_requests = 9; |
| |
| reserved 6, 19, 23; |
| } |
| |
| message ComputationCommitWorkRequest { |
| required string computation_id = 1; |
| repeated WorkItemCommitRequest requests = 2; |
| } |
| |
| message CommitWorkRequest { |
| optional string job_id = 2; |
| optional string project_id = 3; |
| repeated ComputationCommitWorkRequest requests = 1; |
| } |
| |
| // Validation status of applying a single WorkItem. |
| // Statuses earlier in this list supersede later ones. |
| enum CommitStatus { |
| // Default proto value; do not use. |
| DEFAULT = 0; |
| // Commit succeeded. |
| OK = 1; |
| // Some requested resource to modify was not found. |
| NOT_FOUND = 2; |
| // Some part of this request exceeded size limits. |
| TOO_LARGE = 3; |
| // Token for this commit is invalid. |
| INVALID_TOKEN = 4; |
| // This key and token is already committing. |
| ALREADY_IN_COMMIT = 5; |
| // The CommitWork request had validation errors. |
| VALIDATION_FAILED = 6; |
| // The request was aborted. |
| ABORTED = 7; |
| } |
| |
| message CommitWorkResponse {} |
| |
| // Configuration |
| |
| message GetConfigRequest { |
| optional string job_id = 2; |
| repeated string computations = 1; |
| } |
| |
| message ComputationConfig { |
| // Map from user name of stateful transforms in this stage to their state |
| // family. |
| message TransformUserNameToStateFamilyEntry { |
| optional string transform_user_name = 1; |
| optional string state_family = 2; |
| } |
| repeated TransformUserNameToStateFamilyEntry |
| transform_user_name_to_state_family = 1; |
| } |
| |
| message GetConfigResponse { |
| repeated string cloud_works = 1; |
| |
| message NameMapEntry { |
| optional string user_name = 1; |
| optional string system_name = 2; |
| } |
| |
| // Map of user names to system names |
| repeated NameMapEntry name_map = 2; |
| |
| message SystemNameToComputationIdMapEntry { |
| optional string system_name = 1; |
| optional string computation_id = 2; |
| } |
| repeated SystemNameToComputationIdMapEntry |
| system_name_to_computation_id_map = 3; |
| |
| // Map of computation id to ComputationConfig. |
| message ComputationConfigMapEntry { |
| optional string computation_id = 1; |
| optional ComputationConfig computation_config = 2; |
| } |
| repeated ComputationConfigMapEntry computation_config_map = 4; |
| } |
| |
| // Reporting |
| |
| message Exception { |
| repeated string stack_frames = 1; |
| optional Exception cause = 2; |
| } |
| |
| message ReportStatsRequest { |
| optional string job_id = 6; |
| optional string computation_id = 1; |
| optional bytes key = 2; |
| optional fixed64 work_token = 3; |
| optional fixed64 sharding_key = 7; |
| repeated Exception exceptions = 4; |
| repeated Counter counter_updates = 5; |
| } |
| |
| message ReportStatsResponse { |
| optional bool failed = 1; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // Streaming API |
| |
| message StreamingGetWorkRequest { |
| oneof chunk_type { |
| GetWorkRequest request = 1; |
| // Initial message is GetWorkRequest with subsequent messages being extensions. |
| StreamingGetWorkRequestExtension request_extension = 2; |
| } |
| |
| // Ignored after initial request. |
| optional bool supports_multiple_work_items_in_chunk = 5 [default = false]; |
| } |
| |
| message StreamingGetWorkRequestExtension { |
| optional int64 max_items = 1 [default = 0xffffffff]; |
| optional int64 max_bytes = 2 [default = 0x7fffffffffffffff]; |
| } |
| |
| message StreamingGetWorkResponseChunk { |
| // Not necessary for routing, but allows for optimization that if this is |
| // unset it is the same as the previous chunk. |
| optional ComputationWorkItemMetadata computation_metadata = 1; |
| |
| // The serialized bytes are a serialized WorkItem. |
| optional int64 remaining_bytes_for_work_item = 2; |
| repeated bytes serialized_work_item = 3; |
| |
| // Indicates which response stream this message corresponds to. A sequence of |
| // responses for the same stream_id should be deserialized together. Responses |
| // from other stream_ids may be interleaved on the physical stream. |
| optional fixed64 stream_id = 4; |
| |
| // Timing infos for the work item. Windmill Dispatcher and user worker should |
| // propagate critical event timings if the list is not empty. |
| repeated GetWorkStreamTimingInfo per_work_item_timing_infos = 8; |
| |
| // reserved field 5 |
| } |
| |
| message ComputationWorkItemMetadata { |
| optional string computation_id = 1; |
| optional int64 input_data_watermark = 2 [default = -0x8000000000000000]; |
| optional int64 dependent_realtime_input_watermark = 3 |
| [default = -0x8000000000000000]; |
| optional bool drain_mode = 5; |
| } |
| |
| message StreamingGetDataRequest { |
| // The first request on the stream should consist solely of the header. All |
| // subsequent requests should not have the header set. |
| optional JobHeader header = 2; |
| // A request_id for each data request should be unique within a stream. All |
| // response items corresponding to a data request will be identified by the |
| // same request_id. |
| // request_id field enumerates global data requests before state requests. |
| // REQUIRES: |
| // request_id.size() = global_data_request.size() + state_request.size() |
| // for non-heartbeats |
| // request_id.empty() for heartbeats |
| repeated fixed64 request_id = 1; |
| repeated GlobalDataRequest global_data_request = 3; |
| repeated ComputationGetDataRequest state_request = 4; |
| // Will only be set by compatible client |
| repeated ComputationHeartbeatRequest computation_heartbeat_request = 5; |
| } |
| |
| message StreamingGetDataResponse { |
| // Indicates which request this message corresponds to. A sequence of |
| // Responses from other requests may be interleaved on the physical stream. |
| repeated fixed64 request_id = 1; |
| |
| // The serialized bytes are a one of serialized KeyedGetDataResponse or |
| // GlobalData. |
| repeated bytes serialized_response = 2; |
| // Remaining bytes field applies only to the last serialized_response |
| optional int64 remaining_bytes_for_response = 3; |
| |
| // Only set if ComputationHeartbeatRequest was sent, prior versions do not |
| // expect a response for heartbeats. |
| repeated ComputationHeartbeatResponse computation_heartbeat_response = 5; |
| |
| reserved 4; |
| } |
| |
| message StreamingCommitWorkRequest { |
| // The first request on the stream should consist solely of the header. All |
| // subsequent requests should not have the header set. |
| optional JobHeader header = 1; |
| repeated StreamingCommitRequestChunk commit_chunk = 2; |
| } |
| |
| message JobHeader { |
| optional string job_id = 1; |
| optional string project_id = 2; |
| // Worker id is meant for logging only. Do not rely on it for other decisions. |
| optional string worker_id = 3; |
| optional fixed64 client_id = 4; |
| optional string region_id = 5; |
| // Used by the user worker to communicate to a specific windmill worker. This |
| // is initially passed to the user worker via GetWorkerMetadata. |
| optional string backend_worker_token = 6; |
| } |
| |
| message StreamingCommitRequestChunk { |
| // A request_id for each complete request (a set of chunks) should be unique |
| // within a stream. The response corresponding to a request will be identified |
| // by the same request_id. |
| optional fixed64 request_id = 1; |
| |
| // Routing-level fields. Within a stream we can support the optimization that |
| // an unset field has the same value as the previous chunk. This provides |
| // similar benefit to our current batching without requiring separate batching |
| // from transport implementation. |
| optional string computation_id = 2; |
| optional fixed64 sharding_key = 3; |
| |
| // Payload-level fields. The serialized bytes are a serialized |
| // WorkItemCommitRequest. The request is serialized to avoid parsing in the |
| // dispatcher and to allow for requests for which single fields exceed the |
| // byte limit we wish to enforce for message size. The remaining bytes field |
| // is used to indicate this continuation. Workers will perform a streaming |
| // parse of serialized bytes to generate the complete WorkItemCommitRequest |
| // before handing off to the WindmillHost for processing. |
| optional int64 remaining_bytes_for_work_item = 4; |
| optional bytes serialized_work_item_commit = 5; |
| } |
| |
| message StreamingCommitResponse { |
| // Indicates which request this message corresponds to. |
| repeated fixed64 request_id = 1; |
| |
| // Commit status for each request_id in the message. Indices must line up |
| // with the request_id field, but trailing OKs may be omitted. |
| repeated CommitStatus status = 2; |
| } |
| |
| message WorkerMetadataRequest { |
| optional JobHeader header = 1; |
| } |
| |
| // Converted into org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints |
| // used to connect to Streaming Engine. |
| message WorkerMetadataResponse { |
| // The metadata version increases with every modification. Within a single |
| // stream it will always be increasing. The version may be used across streams |
| // to ensure that the view of the metadata does not move backwards. |
| optional int64 metadata_version = 1; |
| |
| // Endpoints that should be used for requesting work with GetWorkStream. |
| // Additional data for returned work should be fetched from the endpoint with |
| // GetDataStream. The work should be committed to the endpoint with |
| // CommitWorkStream. Each response on this stream replaces the previous, and |
| // connections to endpoints that are no longer present should be closed. |
| message Endpoint { |
| // IPv6 address of a streaming engine windmill worker. |
| optional string direct_endpoint = 1; |
| optional string backend_worker_token = 2; |
| optional int64 port = 3; |
| } |
| |
| repeated Endpoint work_endpoints = 2; |
| |
| // Maps from GlobalData tag to the endpoint that should be used for GetData |
| // calls to retrieve that global data. |
| map<string, Endpoint> global_data_endpoints = 3; |
| |
| // Used to set gRPC authority. |
| optional string external_endpoint = 5; |
| |
| enum EndpointType { |
| UNKNOWN = 0; |
| CLOUDPATH = 1; |
| BORG = 2; |
| DIRECTPATH = 3; |
| } |
| |
| optional EndpointType endpoint_type = 6 [default = CLOUDPATH]; |
| |
| reserved 4; |
| } |
| |
| |
| // Client-side settings for gRPC flow control set on the user worker when |
| // constructing the channels and stubs. |
| message UserWorkerGrpcFlowControlSettings { |
| // If true, the user worker will use gRPCs automatic flow control for |
| // windmill RPCs. |
| optional bool enable_auto_flow_control = 1 [default = false]; |
| |
| // The flow control window size for windmill RPCs. If |
| // enable_auto_flow_control is true, this is the initial window size and may |
| // be resized by the gRPC framework. Default and minimum is 10MiB. |
| optional int32 flow_control_window_bytes = 2 [default = 10485760]; |
| |
| // Specifies how many bytes must be queued before the call is considered not |
| // ready to send more messages. |
| optional int32 on_ready_threshold_bytes = 3; |
| } |
| |
| enum ConnectivityType { |
| CONNECTIVITY_TYPE_DEFAULT = 0; |
| CONNECTIVITY_TYPE_CLOUDPATH = 1; |
| CONNECTIVITY_TYPE_DIRECTPATH = 2; |
| } |
| |
| // Settings to control runtime behavior of the java runner v1 user worker. |
| message UserWorkerRunnerV1Settings { |
| optional UserWorkerGrpcFlowControlSettings flow_control_settings = 3; |
| |
| optional ConnectivityType connectivity_type = 4 |
| [default = CONNECTIVITY_TYPE_DEFAULT]; |
| |
| reserved 1, 2; |
| } |
| |
| service WindmillAppliance { |
| // Gets streaming Dataflow work. |
| rpc GetWork(.windmill.GetWorkRequest) returns (.windmill.GetWorkResponse); |
| |
| // Gets data from Windmill. |
| rpc GetData(.windmill.GetDataRequest) returns (.windmill.GetDataResponse); |
| |
| // Commits previously acquired work. |
| rpc CommitWork(.windmill.CommitWorkRequest) |
| returns (.windmill.CommitWorkResponse); |
| |
| // Gets dependant configuration from windmill. |
| rpc GetConfig(.windmill.GetConfigRequest) |
| returns (.windmill.GetConfigResponse); |
| |
| // Reports stats to windmill. |
| rpc ReportStats(.windmill.ReportStatsRequest) |
| returns (.windmill.ReportStatsResponse); |
| } |