| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| syntax = "proto2"; |
| |
| package windmill; |
| |
| option java_package = "org.apache.beam.runners.dataflow.worker.windmill"; |
| option java_outer_classname = "Windmill"; |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // API Data types |
| |
| message Message { |
| required int64 timestamp = 1 [default=-0x8000000000000000]; |
| required bytes data = 2; |
| optional bytes metadata = 3; |
| } |
| |
| message Timer { |
| required bytes tag = 1; |
| optional int64 timestamp = 2 [default=-0x8000000000000000]; |
| enum Type { |
| WATERMARK = 0; |
| REALTIME = 1; |
| DEPENDENT_REALTIME = 2; |
| } |
| optional Type type = 3 [default = WATERMARK]; |
| optional string state_family = 4; |
| } |
| |
| message InputMessageBundle { |
| required string source_computation_id = 1; |
| repeated Message messages = 2; |
| } |
| |
| message KeyedMessageBundle { |
| required bytes key = 1; |
| optional fixed64 sharding_key = 4; |
| repeated Message messages = 2; |
| repeated bytes messages_ids = 3; |
| } |
| |
| message OutputMessageBundle { |
| optional string destination_computation_id = 1; |
| optional string destination_stream_id = 3; |
| repeated KeyedMessageBundle bundles = 2; |
| } |
| |
| message PubSubMessageBundle { |
| required string topic = 1; |
| repeated Message messages = 2; |
| optional string timestamp_label = 3; |
| optional string id_label = 4; |
| // If set, then each message is interpreted as a PubsubMessage proto, |
| // containing a payload and attributes. |
| optional bool with_attributes = 5; |
| } |
| |
| message TimerBundle { |
| repeated Timer timers = 1; |
| } |
| |
| message Value { |
| required int64 timestamp = 1 [default=-0x8000000000000000]; |
| required bytes data = 2; |
| } |
| |
| message TagValue { |
| required bytes tag = 1; |
| optional Value value = 2; |
| optional string state_family = 3; |
| } |
| |
| message TagBag { |
| optional bytes tag = 1; |
| // In request: All existing items in the list will be deleted. If new values |
| // are present they will be written. |
| optional bool delete_all = 2; |
| // In request: The given values will be written to the collection. |
| // In response: Values that are present in the collection. |
| repeated bytes values = 3; |
| optional string state_family = 4; |
| // In request: A previously returned continuation_position from an earlier |
| // read response. Indicates we wish to fetch the next page of values. |
| // In response: Copied from request. |
| optional int64 request_position = 7; |
| // In response only: Set when there are values after those returned above, but |
| // they were suppressed to respect the fetch_max_bytes limit. Subsequent |
| // requests should copy this to request_position to retrieve the next page of |
| // values. |
| optional int64 continuation_position = 5; |
| // In request: Limits the size of the fetched bag to this byte limit. |
| optional int64 fetch_max_bytes = 6 [default = 0x7fffffffffffffff]; |
| } |
| |
| message GlobalDataId { |
| required string tag = 1; |
| required bytes version = 2; |
| } |
| |
| message GlobalData { |
| required GlobalDataId data_id = 1; |
| optional bool is_ready = 2; |
| optional bytes data = 3; |
| optional string state_family = 4; |
| } |
| |
| message SourceState { |
| optional bytes state = 1; |
| repeated fixed64 finalize_ids = 2; |
| optional bool only_finalize = 3; |
| } |
| |
| message WatermarkHold { |
| required bytes tag = 1; |
| repeated int64 timestamps = 2 [packed=true]; |
| optional bool reset = 3; |
| optional string state_family = 4; |
| } |
| |
| // Proto describing a hot key detected on a given WorkItem. |
| message HotKeyInfo { |
| // The age of the hot key measured from when it was first detected. |
| optional int64 hot_key_age_usec = 1; |
| } |
| |
| message WorkItem { |
| required bytes key = 1; |
| required fixed64 work_token = 2; |
| optional fixed64 sharding_key = 9; |
| |
| optional fixed64 cache_token = 7; |
| |
| repeated InputMessageBundle message_bundles = 3; |
| optional TimerBundle timers = 4; |
| repeated GlobalDataId global_data_id_notifications = 5; |
| optional SourceState source_state = 6; |
| optional int64 output_data_watermark = 8 [default=-0x8000000000000000]; |
| // Indicates that this is a new key with no data associated. This allows |
| // the harness to optimize data fetching. |
| optional bool is_new_key = 10; |
| |
| // A hot key is a symptom of poor data distribution in which there are enough |
| // elements mapped to a single key to impact pipeline performance. When |
| // present, this field includes metadata associated with any hot key. |
| optional HotKeyInfo hot_key_info = 11; |
| } |
| |
| message ComputationWorkItems { |
| required string computation_id = 1; |
| repeated WorkItem work = 2; |
| optional int64 input_data_watermark = 3 [default=-0x8000000000000000]; |
| optional int64 dependent_realtime_input_watermark = 4 |
| [default = -0x8000000000000000]; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // API calls |
| |
| // GetWork |
| |
| message GetWorkRequest { |
| required fixed64 client_id = 1; |
| optional string worker_id = 4; |
| optional string job_id = 5; |
| optional string project_id = 7; |
| optional int64 max_items = 2 [default = 0xffffffff]; |
| optional int64 max_bytes = 3 [default = 0x7fffffffffffffff]; |
| reserved 6; |
| } |
| |
| message GetWorkResponse { |
| repeated ComputationWorkItems work = 1; |
| } |
| |
| // GetData |
| |
| message KeyedGetDataRequest { |
| required bytes key = 1; |
| required fixed64 work_token = 2; |
| optional fixed64 sharding_key = 6; |
| repeated TagValue values_to_fetch = 3; |
| repeated TagBag bags_to_fetch = 8; |
| repeated WatermarkHold watermark_holds_to_fetch = 5; |
| |
| optional int64 max_bytes = 7; |
| reserved 4; |
| } |
| |
| message ComputationGetDataRequest { |
| required string computation_id = 1; |
| repeated KeyedGetDataRequest requests = 2; |
| } |
| |
| message GetDataRequest { |
| optional string job_id = 4; |
| optional string project_id = 5; |
| repeated ComputationGetDataRequest requests = 1; |
| repeated GlobalDataRequest global_data_fetch_requests = 3; |
| |
| // DEPRECATED |
| repeated GlobalDataId global_data_to_fetch = 2; |
| } |
| |
| message KeyedGetDataResponse { |
| required bytes key = 1; |
| // The response for this key is not populated due to the fetch failing. |
| optional bool failed = 2; |
| repeated TagValue values = 3; |
| repeated TagBag bags = 6; |
| repeated WatermarkHold watermark_holds = 5; |
| |
| reserved 4; |
| } |
| |
| message ComputationGetDataResponse { |
| required string computation_id = 1; |
| repeated KeyedGetDataResponse data = 2; |
| } |
| |
| message GetDataResponse { |
| repeated ComputationGetDataResponse data = 1; |
| repeated GlobalData global_data = 2; |
| } |
| |
| // CommitWork |
| |
| message Counter { |
| optional string name = 1; |
| enum Kind { |
| SUM = 0; |
| MAX = 1; |
| MIN = 2; |
| MEAN = 3; |
| }; |
| optional Kind kind = 2; |
| |
| // For SUM, MAX, MIN, AND, OR, MEAN at most one of the following should be |
| // set. For MEAN it is the sum |
| optional double double_scalar = 3; |
| optional int64 int_scalar = 4; |
| |
| // Only set for MEAN. Count of elements contributing to the sum. |
| optional int64 mean_count = 6; |
| |
| // True if this metric is reported as the total cumulative aggregate |
| // value accumulated since the worker started working on this WorkItem. |
| // By default this is false, indicating that this metric is reported |
| // as a delta that is not associated with any WorkItem. |
| optional bool cumulative = 7; |
| } |
| |
| message GlobalDataRequest { |
| required GlobalDataId data_id = 1; |
| optional int64 existence_watermark_deadline = 2 [default=0x7FFFFFFFFFFFFFFF]; |
| optional string state_family = 3; |
| } |
| |
| // next id: 19 |
| message WorkItemCommitRequest { |
| required bytes key = 1; |
| required fixed64 work_token = 2; |
| optional fixed64 sharding_key = 15; |
| optional fixed64 cache_token = 16; |
| |
| repeated OutputMessageBundle output_messages = 3; |
| repeated PubSubMessageBundle pubsub_messages = 7; |
| repeated Timer output_timers = 4; |
| repeated TagValue value_updates = 5; |
| repeated TagBag bag_updates = 18; |
| repeated Counter counter_updates = 8; |
| repeated GlobalDataRequest global_data_requests = 11; |
| repeated GlobalData global_data_updates = 10; |
| optional SourceState source_state_updates = 12; |
| optional int64 source_watermark = 13 [default=-0x8000000000000000]; |
| optional int64 source_backlog_bytes = 17 [default=-1]; |
| repeated WatermarkHold watermark_holds = 14; |
| |
| // DEPRECATED |
| repeated GlobalDataId global_data_id_requests = 9; |
| |
| reserved 6; |
| } |
| |
| message ComputationCommitWorkRequest { |
| required string computation_id = 1; |
| repeated WorkItemCommitRequest requests = 2; |
| } |
| |
| message CommitWorkRequest { |
| optional string job_id = 2; |
| optional string project_id = 3; |
| repeated ComputationCommitWorkRequest requests = 1; |
| } |
| |
| // Validation status of applying a single WorkItem. |
| // Statuses earlier in this list supersede later ones. |
| enum CommitStatus { |
| // Default proto value; do not use. |
| DEFAULT = 0; |
| // Commit succeeded. |
| OK = 1; |
| // Some requested resource to modify was not found. |
| NOT_FOUND = 2; |
| // Some part of this request exceeded size limits. |
| TOO_LARGE = 3; |
| // Token for this commit is invalid. |
| INVALID_TOKEN = 4; |
| // This key and token is already committing. |
| ALREADY_IN_COMMIT = 5; |
| // The CommitWork request had validation errors. |
| VALIDATION_FAILED = 6; |
| // The request was aborted. |
| ABORTED = 7; |
| } |
| |
| message CommitWorkResponse {} |
| |
| // Configuration |
| |
| message GetConfigRequest { |
| optional string job_id = 2; |
| repeated string computations = 1; |
| } |
| |
| message ComputationConfig { |
| // Map from user name of stateful transforms in this stage to their state |
| // family. |
| message TransformUserNameToStateFamilyEntry { |
| optional string transform_user_name = 1; |
| optional string state_family = 2; |
| } |
| repeated TransformUserNameToStateFamilyEntry |
| transform_user_name_to_state_family = 1; |
| } |
| |
| message GetConfigResponse { |
| repeated string cloud_works = 1; |
| |
| message NameMapEntry { |
| optional string user_name = 1; |
| optional string system_name = 2; |
| } |
| |
| // Map of user names to system names |
| repeated NameMapEntry name_map = 2; |
| |
| message SystemNameToComputationIdMapEntry { |
| optional string system_name = 1; |
| optional string computation_id = 2; |
| } |
| repeated SystemNameToComputationIdMapEntry |
| system_name_to_computation_id_map = 3; |
| |
| // Map of computation id to ComputationConfig. |
| message ComputationConfigMapEntry { |
| optional string computation_id = 1; |
| optional ComputationConfig computation_config = 2; |
| } |
| repeated ComputationConfigMapEntry computation_config_map = 4; |
| } |
| |
| // Reporting |
| |
| message Exception { |
| repeated string stack_frames = 1; |
| optional Exception cause = 2; |
| } |
| |
| message ReportStatsRequest { |
| optional string job_id = 6; |
| optional string computation_id = 1; |
| optional bytes key = 2; |
| optional fixed64 work_token = 3; |
| optional fixed64 sharding_key = 7; |
| repeated Exception exceptions = 4; |
| repeated Counter counter_updates = 5; |
| } |
| |
| message ReportStatsResponse { |
| optional bool failed = 1; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // Streaming API |
| |
| message StreamingGetWorkRequest { |
| oneof chunk_type { |
| GetWorkRequest request = 1; |
| StreamingGetWorkRequestExtension request_extension = 2; |
| } |
| } |
| |
| message StreamingGetWorkRequestExtension { |
| optional int64 max_items = 1 [default = 0xffffffff]; |
| optional int64 max_bytes = 2 [default = 0x7fffffffffffffff]; |
| } |
| |
| message StreamingGetWorkResponseChunk { |
| // Not necessary for routing, but allows for optimization that if this is |
| // unset it is the same as the previous chunk. |
| optional ComputationWorkItemMetadata computation_metadata = 1; |
| |
| // The serialized bytes are a serialized WorkItem. |
| optional int64 remaining_bytes_for_work_item = 2; |
| optional bytes serialized_work_item = 3; |
| |
| // Indicates which response stream this message corresponds to. A sequence of |
| // responses for the same stream_id should be deserialized together. Responses |
| // from other stream_ids may be interleaved on the physical stream. |
| optional fixed64 stream_id = 4; |
| |
| // reserved field 5 |
| } |
| |
| message ComputationWorkItemMetadata { |
| optional string computation_id = 1; |
| optional int64 input_data_watermark = 2 [default=-0x8000000000000000]; |
| optional int64 dependent_realtime_input_watermark = 3 |
| [default = -0x8000000000000000]; |
| } |
| |
| message StreamingGetDataRequest { |
| // The first request on the stream should consist solely of the header. All |
| // subsequent requests should not have the header set. |
| optional JobHeader header = 2; |
| // A request_id for each data request should be unique within a stream. All |
| // response items corresponding to a data request will be identified by the |
| // same request_id. |
| // request_id field enumerates global data requests before state requests. |
| // REQUIRES: |
| // request_id.size() = global_data_request.size() + state_request.size() |
| // for non-heartbeats |
| // request_id.empty() for heartbeats |
| repeated fixed64 request_id = 1; |
| repeated GlobalDataRequest global_data_request = 3; |
| repeated ComputationGetDataRequest state_request = 4; |
| } |
| |
| message StreamingGetDataResponse { |
| // Indicates which request this message corresponds to. A sequence of |
| // Responses from other requests may be interleaved on the physical stream. |
| repeated fixed64 request_id = 1; |
| |
| // The serialized bytes are a one of serialized KeyedGetDataResponse or |
| // GlobalData. |
| repeated bytes serialized_response = 2; |
| // Remaining bytes field applies only to the last serialized_response |
| optional int64 remaining_bytes_for_response = 3; |
| } |
| |
| message StreamingCommitWorkRequest { |
| // The first request on the stream should consist solely of the header. All |
| // subsequent requests should not have the header set. |
| optional JobHeader header = 1; |
| repeated StreamingCommitRequestChunk commit_chunk = 2; |
| } |
| |
| message JobHeader { |
| optional string job_id = 1; |
| optional string project_id = 2; |
| // Worker id is meant for logging only. Do not rely on it for other decisions. |
| optional string worker_id = 3; |
| } |
| |
| message StreamingCommitRequestChunk { |
| // A request_id for each complete request (a set of chunks) should be unique |
| // within a stream. The response corresponding to a request will be identified |
| // by the same request_id. |
| optional fixed64 request_id = 1; |
| |
| // Routing-level fields. Within a stream we can support the optimization that |
| // an unset field has the same value as the previous chunk. This provides |
| // similar benefit to our current batching without requiring separate batching |
| // from transport implementation. |
| optional string computation_id = 2; |
| optional fixed64 sharding_key = 3; |
| |
| // Payload-level fields. The serialized bytes are a serialized |
| // WorkItemCommitRequest. The request is serialized to avoid parsing in the |
| // dispatcher and to allow for requests for which single fields exceed the |
| // byte limit we wish to enforce for message size. The remaining bytes field |
| // is used to indicate this continuation. Workers will perform a streaming |
| // parse of serialized bytes to generate the complete WorkItemCommitRequest |
| // before handing off to the WindmillHost for processing. |
| optional int64 remaining_bytes_for_work_item = 4; |
| optional bytes serialized_work_item_commit = 5; |
| } |
| |
| message StreamingCommitResponse { |
| // Indicates which request this message corresponds to. |
| repeated fixed64 request_id = 1; |
| |
| // Commit status for each request_id in the message. Indices must line up |
| // with the request_id field, but trailing OKs may be omitted. |
| repeated CommitStatus status = 2; |
| } |
| |
| service WindmillAppliance { |
| // Gets streaming Dataflow work. |
| rpc GetWork(.windmill.GetWorkRequest) returns (.windmill.GetWorkResponse); |
| |
| // Gets data from Windmill. |
| rpc GetData(.windmill.GetDataRequest) returns (.windmill.GetDataResponse); |
| |
| // Commits previously acquired work. |
| rpc CommitWork(.windmill.CommitWorkRequest) |
| returns (.windmill.CommitWorkResponse); |
| |
| // Gets dependant configuration from windmill. |
| rpc GetConfig(.windmill.GetConfigRequest) |
| returns (.windmill.GetConfigResponse); |
| |
| // Reports stats to windmill. |
| rpc ReportStats(.windmill.ReportStatsRequest) |
| returns (.windmill.ReportStatsResponse); |
| } |