blob: 598164ec69d175c7bac2ab9cf75733c6457edcef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto2";
package windmill;
option java_package = "org.apache.beam.runners.dataflow.worker.windmill";
option java_outer_classname = "Windmill";
////////////////////////////////////////////////////////////////////////////////
// API Data types
message Message {
required int64 timestamp = 1 [default=-0x8000000000000000];
required bytes data = 2;
optional bytes metadata = 3;
}
message Timer {
required bytes tag = 1;
optional int64 timestamp = 2 [default=-0x8000000000000000];
enum Type {
WATERMARK = 0;
REALTIME = 1;
DEPENDENT_REALTIME = 2;
}
optional Type type = 3 [default = WATERMARK];
optional string state_family = 4;
}
message InputMessageBundle {
required string source_computation_id = 1;
repeated Message messages = 2;
}
message KeyedMessageBundle {
required bytes key = 1;
optional fixed64 sharding_key = 4;
repeated Message messages = 2;
repeated bytes messages_ids = 3;
}
message OutputMessageBundle {
optional string destination_computation_id = 1;
optional string destination_stream_id = 3;
repeated KeyedMessageBundle bundles = 2;
}
message PubSubMessageBundle {
required string topic = 1;
repeated Message messages = 2;
optional string timestamp_label = 3;
optional string id_label = 4;
// If set, then each message is interpreted as a PubsubMessage proto,
// containing a payload and attributes.
optional bool with_attributes = 5;
}
message TimerBundle {
repeated Timer timers = 1;
}
message Value {
required int64 timestamp = 1 [default=-0x8000000000000000];
required bytes data = 2;
}
message TagValue {
required bytes tag = 1;
optional Value value = 2;
optional string state_family = 3;
}
message TagBag {
optional bytes tag = 1;
// In request: All existing items in the list will be deleted. If new values
// are present they will be written.
optional bool delete_all = 2;
// In request: The given values will be written to the collection.
// In response: Values that are present in the collection.
repeated bytes values = 3;
optional string state_family = 4;
// In request: A previously returned continuation_position from an earlier
// read response. Indicates we wish to fetch the next page of values.
// In response: Copied from request.
optional int64 request_position = 7;
// In response only: Set when there are values after those returned above, but
// they were suppressed to respect the fetch_max_bytes limit. Subsequent
// requests should copy this to request_position to retrieve the next page of
// values.
optional int64 continuation_position = 5;
// In request: Limits the size of the fetched bag to this byte limit.
optional int64 fetch_max_bytes = 6 [default = 0x7fffffffffffffff];
}
message GlobalDataId {
required string tag = 1;
required bytes version = 2;
}
message GlobalData {
required GlobalDataId data_id = 1;
optional bool is_ready = 2;
optional bytes data = 3;
optional string state_family = 4;
}
message SourceState {
optional bytes state = 1;
repeated fixed64 finalize_ids = 2;
optional bool only_finalize = 3;
}
message WatermarkHold {
required bytes tag = 1;
repeated int64 timestamps = 2 [packed=true];
optional bool reset = 3;
optional string state_family = 4;
}
// Proto describing a hot key detected on a given WorkItem.
message HotKeyInfo {
// The age of the hot key measured from when it was first detected.
optional int64 hot_key_age_usec = 1;
}
message WorkItem {
required bytes key = 1;
required fixed64 work_token = 2;
optional fixed64 sharding_key = 9;
optional fixed64 cache_token = 7;
repeated InputMessageBundle message_bundles = 3;
optional TimerBundle timers = 4;
repeated GlobalDataId global_data_id_notifications = 5;
optional SourceState source_state = 6;
optional int64 output_data_watermark = 8 [default=-0x8000000000000000];
// Indicates that this is a new key with no data associated. This allows
// the harness to optimize data fetching.
optional bool is_new_key = 10;
// A hot key is a symptom of poor data distribution in which there are enough
// elements mapped to a single key to impact pipeline performance. When
// present, this field includes metadata associated with any hot key.
optional HotKeyInfo hot_key_info = 11;
}
message ComputationWorkItems {
required string computation_id = 1;
repeated WorkItem work = 2;
optional int64 input_data_watermark = 3 [default=-0x8000000000000000];
optional int64 dependent_realtime_input_watermark = 4
[default = -0x8000000000000000];
}
////////////////////////////////////////////////////////////////////////////////
// API calls
// GetWork
message GetWorkRequest {
required fixed64 client_id = 1;
optional string worker_id = 4;
optional string job_id = 5;
optional string project_id = 7;
optional int64 max_items = 2 [default = 0xffffffff];
optional int64 max_bytes = 3 [default = 0x7fffffffffffffff];
reserved 6;
}
message GetWorkResponse {
repeated ComputationWorkItems work = 1;
}
// GetData
message KeyedGetDataRequest {
required bytes key = 1;
required fixed64 work_token = 2;
optional fixed64 sharding_key = 6;
repeated TagValue values_to_fetch = 3;
repeated TagBag bags_to_fetch = 8;
repeated WatermarkHold watermark_holds_to_fetch = 5;
optional int64 max_bytes = 7;
reserved 4;
}
message ComputationGetDataRequest {
required string computation_id = 1;
repeated KeyedGetDataRequest requests = 2;
}
message GetDataRequest {
optional string job_id = 4;
optional string project_id = 5;
repeated ComputationGetDataRequest requests = 1;
repeated GlobalDataRequest global_data_fetch_requests = 3;
// DEPRECATED
repeated GlobalDataId global_data_to_fetch = 2;
}
message KeyedGetDataResponse {
required bytes key = 1;
// The response for this key is not populated due to the fetch failing.
optional bool failed = 2;
repeated TagValue values = 3;
repeated TagBag bags = 6;
repeated WatermarkHold watermark_holds = 5;
reserved 4;
}
message ComputationGetDataResponse {
required string computation_id = 1;
repeated KeyedGetDataResponse data = 2;
}
message GetDataResponse {
repeated ComputationGetDataResponse data = 1;
repeated GlobalData global_data = 2;
}
// CommitWork
message Counter {
optional string name = 1;
enum Kind {
SUM = 0;
MAX = 1;
MIN = 2;
MEAN = 3;
};
optional Kind kind = 2;
// For SUM, MAX, MIN, AND, OR, MEAN at most one of the following should be
// set. For MEAN it is the sum
optional double double_scalar = 3;
optional int64 int_scalar = 4;
// Only set for MEAN. Count of elements contributing to the sum.
optional int64 mean_count = 6;
// True if this metric is reported as the total cumulative aggregate
// value accumulated since the worker started working on this WorkItem.
// By default this is false, indicating that this metric is reported
// as a delta that is not associated with any WorkItem.
optional bool cumulative = 7;
}
message GlobalDataRequest {
required GlobalDataId data_id = 1;
optional int64 existence_watermark_deadline = 2 [default=0x7FFFFFFFFFFFFFFF];
optional string state_family = 3;
}
// next id: 24
message WorkItemCommitRequest {
required bytes key = 1;
required fixed64 work_token = 2;
optional fixed64 sharding_key = 15;
optional fixed64 cache_token = 16;
optional bool exceeds_max_work_item_commit_bytes = 20;
optional int64 estimated_work_item_commit_bytes = 21;
repeated OutputMessageBundle output_messages = 3;
repeated PubSubMessageBundle pubsub_messages = 7;
repeated Timer output_timers = 4;
repeated TagValue value_updates = 5;
repeated TagBag bag_updates = 18;
repeated Counter counter_updates = 8;
repeated GlobalDataRequest global_data_requests = 11;
repeated GlobalData global_data_updates = 10;
optional SourceState source_state_updates = 12;
optional int64 source_watermark = 13 [default=-0x8000000000000000];
optional int64 source_backlog_bytes = 17 [default=-1];
optional int64 source_bytes_processed = 22;
repeated WatermarkHold watermark_holds = 14;
// DEPRECATED
repeated GlobalDataId global_data_id_requests = 9;
reserved 6, 19, 23;
}
message ComputationCommitWorkRequest {
required string computation_id = 1;
repeated WorkItemCommitRequest requests = 2;
}
message CommitWorkRequest {
optional string job_id = 2;
optional string project_id = 3;
repeated ComputationCommitWorkRequest requests = 1;
}
// Validation status of applying a single WorkItem.
// Statuses earlier in this list supersede later ones.
enum CommitStatus {
// Default proto value; do not use.
DEFAULT = 0;
// Commit succeeded.
OK = 1;
// Some requested resource to modify was not found.
NOT_FOUND = 2;
// Some part of this request exceeded size limits.
TOO_LARGE = 3;
// Token for this commit is invalid.
INVALID_TOKEN = 4;
// This key and token is already committing.
ALREADY_IN_COMMIT = 5;
// The CommitWork request had validation errors.
VALIDATION_FAILED = 6;
// The request was aborted.
ABORTED = 7;
}
message CommitWorkResponse {}
// Configuration
message GetConfigRequest {
optional string job_id = 2;
repeated string computations = 1;
}
message ComputationConfig {
// Map from user name of stateful transforms in this stage to their state
// family.
message TransformUserNameToStateFamilyEntry {
optional string transform_user_name = 1;
optional string state_family = 2;
}
repeated TransformUserNameToStateFamilyEntry
transform_user_name_to_state_family = 1;
}
message GetConfigResponse {
repeated string cloud_works = 1;
message NameMapEntry {
optional string user_name = 1;
optional string system_name = 2;
}
// Map of user names to system names
repeated NameMapEntry name_map = 2;
message SystemNameToComputationIdMapEntry {
optional string system_name = 1;
optional string computation_id = 2;
}
repeated SystemNameToComputationIdMapEntry
system_name_to_computation_id_map = 3;
// Map of computation id to ComputationConfig.
message ComputationConfigMapEntry {
optional string computation_id = 1;
optional ComputationConfig computation_config = 2;
}
repeated ComputationConfigMapEntry computation_config_map = 4;
}
// Reporting
message Exception {
repeated string stack_frames = 1;
optional Exception cause = 2;
}
message ReportStatsRequest {
optional string job_id = 6;
optional string computation_id = 1;
optional bytes key = 2;
optional fixed64 work_token = 3;
optional fixed64 sharding_key = 7;
repeated Exception exceptions = 4;
repeated Counter counter_updates = 5;
}
message ReportStatsResponse {
optional bool failed = 1;
}
////////////////////////////////////////////////////////////////////////////////
// Streaming API
message StreamingGetWorkRequest {
oneof chunk_type {
GetWorkRequest request = 1;
StreamingGetWorkRequestExtension request_extension = 2;
}
}
message StreamingGetWorkRequestExtension {
optional int64 max_items = 1 [default = 0xffffffff];
optional int64 max_bytes = 2 [default = 0x7fffffffffffffff];
}
message StreamingGetWorkResponseChunk {
// Not necessary for routing, but allows for optimization that if this is
// unset it is the same as the previous chunk.
optional ComputationWorkItemMetadata computation_metadata = 1;
// The serialized bytes are a serialized WorkItem.
optional int64 remaining_bytes_for_work_item = 2;
optional bytes serialized_work_item = 3;
// Indicates which response stream this message corresponds to. A sequence of
// responses for the same stream_id should be deserialized together. Responses
// from other stream_ids may be interleaved on the physical stream.
optional fixed64 stream_id = 4;
// reserved field 5
}
message ComputationWorkItemMetadata {
optional string computation_id = 1;
optional int64 input_data_watermark = 2 [default=-0x8000000000000000];
optional int64 dependent_realtime_input_watermark = 3
[default = -0x8000000000000000];
}
message StreamingGetDataRequest {
// The first request on the stream should consist solely of the header. All
// subsequent requests should not have the header set.
optional JobHeader header = 2;
// A request_id for each data request should be unique within a stream. All
// response items corresponding to a data request will be identified by the
// same request_id.
// request_id field enumerates global data requests before state requests.
// REQUIRES:
// request_id.size() = global_data_request.size() + state_request.size()
// for non-heartbeats
// request_id.empty() for heartbeats
repeated fixed64 request_id = 1;
repeated GlobalDataRequest global_data_request = 3;
repeated ComputationGetDataRequest state_request = 4;
}
message StreamingGetDataResponse {
// Indicates which request this message corresponds to. A sequence of
// Responses from other requests may be interleaved on the physical stream.
repeated fixed64 request_id = 1;
// The serialized bytes are a one of serialized KeyedGetDataResponse or
// GlobalData.
repeated bytes serialized_response = 2;
// Remaining bytes field applies only to the last serialized_response
optional int64 remaining_bytes_for_response = 3;
}
message StreamingCommitWorkRequest {
// The first request on the stream should consist solely of the header. All
// subsequent requests should not have the header set.
optional JobHeader header = 1;
repeated StreamingCommitRequestChunk commit_chunk = 2;
}
message JobHeader {
optional string job_id = 1;
optional string project_id = 2;
// Worker id is meant for logging only. Do not rely on it for other decisions.
optional string worker_id = 3;
}
message StreamingCommitRequestChunk {
// A request_id for each complete request (a set of chunks) should be unique
// within a stream. The response corresponding to a request will be identified
// by the same request_id.
optional fixed64 request_id = 1;
// Routing-level fields. Within a stream we can support the optimization that
// an unset field has the same value as the previous chunk. This provides
// similar benefit to our current batching without requiring separate batching
// from transport implementation.
optional string computation_id = 2;
optional fixed64 sharding_key = 3;
// Payload-level fields. The serialized bytes are a serialized
// WorkItemCommitRequest. The request is serialized to avoid parsing in the
// dispatcher and to allow for requests for which single fields exceed the
// byte limit we wish to enforce for message size. The remaining bytes field
// is used to indicate this continuation. Workers will perform a streaming
// parse of serialized bytes to generate the complete WorkItemCommitRequest
// before handing off to the WindmillHost for processing.
optional int64 remaining_bytes_for_work_item = 4;
optional bytes serialized_work_item_commit = 5;
}
message StreamingCommitResponse {
// Indicates which request this message corresponds to.
repeated fixed64 request_id = 1;
// Commit status for each request_id in the message. Indices must line up
// with the request_id field, but trailing OKs may be omitted.
repeated CommitStatus status = 2;
}
service WindmillAppliance {
// Gets streaming Dataflow work.
rpc GetWork(.windmill.GetWorkRequest) returns (.windmill.GetWorkResponse);
// Gets data from Windmill.
rpc GetData(.windmill.GetDataRequest) returns (.windmill.GetDataResponse);
// Commits previously acquired work.
rpc CommitWork(.windmill.CommitWorkRequest)
returns (.windmill.CommitWorkResponse);
// Gets dependant configuration from windmill.
rpc GetConfig(.windmill.GetConfigRequest)
returns (.windmill.GetConfigResponse);
// Reports stats to windmill.
rpc ReportStats(.windmill.ReportStatsRequest)
returns (.windmill.ReportStatsResponse);
}