| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| /* |
| * Protocol Buffers for metrics classes, used in the Fn API, Job API, and by SDKs. |
| */ |
| |
| syntax = "proto3"; |
| |
| package org.apache.beam.model.pipeline.v1; |
| |
| option go_package = "pipeline_v1"; |
| option java_package = "org.apache.beam.model.pipeline.v1"; |
| option java_outer_classname = "MetricsApi"; |
| |
| |
| import "beam_runner_api.proto"; |
| import "google/protobuf/descriptor.proto"; |
| import "google/protobuf/timestamp.proto"; |
| |
| // A specification containing required set of fields and labels required |
| // to be set on a MonitoringInfo for the specific URN for SDK->RunnerHarness |
| // ProcessBundleResponse reporting. |
| message MonitoringInfoSpec { |
| string urn = 1; |
| string type_urn = 2; |
| // The list of required |
| repeated string required_labels = 3; |
| // Extra non functional parts of the spec for descriptive purposes. |
| // i.e. description, units, etc. |
| repeated Annotation annotations = 4; |
| } |
| |
| // The key name and value string of MonitoringInfo annotations. |
| message Annotation { |
| string key = 1; |
| string value = 2; |
| } |
| |
| // Populated MonitoringInfoSpecs for specific URNs. |
| // Indicating the required fields to be set. |
| // SDKs and RunnerHarnesses can load these instances into memory and write a |
| // validator or code generator to assist with populating and validating |
| // MonitoringInfo protos. |
| message MonitoringInfoSpecs { |
| enum Enum { |
| // TODO(BEAM-6926): Add the PTRANSFORM name as a required label after |
| // upgrading the python SDK. |
| USER_COUNTER = 0 [(monitoring_info_spec) = { |
| urn: "beam:metric:user", |
| type_urn: "beam:metrics:sum_int_64", |
| required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"], |
| annotations: [{ |
| key: "description", |
| value: "URN utilized to report user numeric counters." |
| }] |
| }]; |
| |
| ELEMENT_COUNT = 1 [(monitoring_info_spec) = { |
| urn: "beam:metric:element_count:v1", |
| type_urn: "beam:metrics:sum_int_64", |
| required_labels: [ "PCOLLECTION" ], |
| annotations: [ { |
| key: "description", |
| value: "The total elements output to a Pcollection by a PTransform." |
| } ] |
| }]; |
| |
| SAMPLED_BYTE_SIZE = 7 [(monitoring_info_spec) = { |
| urn: "beam:metric:sampled_byte_size:v1", |
| type_urn: "beam:metrics:distribution_int_64", |
| required_labels: [ "PCOLLECTION" ], |
| annotations: [ { |
| key: "description", |
| value: "The total byte size and count of a sampled " |
| " set (or all) of elements in the pcollection. Sampling is used " |
| " because calculating the byte count involves serializing the " |
| " elements which is CPU intensive." |
| } ] |
| }]; |
| |
| START_BUNDLE_MSECS = 2 [(monitoring_info_spec) = { |
| urn: "beam:metric:pardo_execution_time:start_bundle_msecs:v1", |
| type_urn: "beam:metrics:sum_int_64", |
| required_labels: [ "PTRANSFORM" ], |
| annotations: [ { |
| key: "description", |
| value: "The total estimated execution time of the start bundle" |
| "function in a pardo" |
| } ] |
| }]; |
| |
| PROCESS_BUNDLE_MSECS = 3 [(monitoring_info_spec) = { |
| urn: "beam:metric:pardo_execution_time:process_bundle_msecs:v1", |
| type_urn: "beam:metrics:sum_int_64", |
| required_labels: [ "PTRANSFORM" ], |
| annotations: [ { |
| key: "description", |
| value: "The total estimated execution time of the process bundle" |
| "function in a pardo" |
| } ] |
| }]; |
| |
| FINISH_BUNDLE_MSECS = 4 [(monitoring_info_spec) = { |
| urn: "beam:metric:pardo_execution_time:finish_bundle_msecs:v1", |
| type_urn: "beam:metrics:sum_int_64", |
| required_labels: [ "PTRANSFORM" ], |
| annotations: [ { |
| key: "description", |
| value: "The total estimated execution time of the finish bundle " |
| "function in a pardo" |
| } ] |
| }]; |
| |
| TOTAL_MSECS = 5 [(monitoring_info_spec) = { |
| urn: "beam:metric:ptransform_execution_time:total_msecs:v1", |
| type_urn: "beam:metrics:sum_int_64", |
| required_labels: [ "PTRANSFORM" ], |
| annotations: [ { |
| key: "description", |
| value: "The total estimated execution time of the ptransform" |
| } ] |
| }]; |
| |
| // TODO(BEAM-6926): Add the PTRANSFORM name as a required label after |
| // upgrading the python SDK. |
| USER_DISTRIBUTION_COUNTER = 6 [(monitoring_info_spec) = { |
| urn: "beam:metric:user_distribution", |
| type_urn: "beam:metrics:distribution_int_64", |
| required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"], |
| annotations: [{ |
| key: "description", |
| value: "URN utilized to report user distribution counters." |
| }] |
| }]; |
| } |
| } |
| |
| // A set of properties for the MonitoringInfoLabel, this is useful to obtain |
| // the proper label string for the MonitoringInfoLabel. |
| message MonitoringInfoLabelProps { |
| // The label key to use in the MonitoringInfo labels map. |
| string name = 1; |
| } |
| |
| // Enum extension to store MonitoringInfo related |
| // specifications, constants, etc. |
| extend google.protobuf.EnumValueOptions { |
| MonitoringInfoLabelProps label_props = 127337796; // From: commit 0x7970544. |
| |
| // Enum extension to store the MonitoringInfoSpecs. |
| MonitoringInfoSpec monitoring_info_spec = 207174266; |
| } |
| |
| message MonitoringInfo { |
| // The name defining the metric or monitored state. |
| string urn = 1; |
| |
| // This is specified as a URN that implies: |
| // A message class: (Distribution, Counter, Extrema, MonitoringDataTable). |
| // Sub types like field formats - int64, double, string. |
| // Aggregation methods - SUM, LATEST, TOP-N, BOTTOM-N, DISTRIBUTION |
| // valid values are: |
| // beam:metrics:[sum_int_64|latest_int_64|top_n_int_64|bottom_n_int_64| |
| // sum_double|latest_double|top_n_double|bottom_n_double| |
| // distribution_int_64|distribution_double|monitoring_data_table |
| string type = 2; |
| |
| // The Metric or monitored state. |
| oneof data { |
| MonitoringTableData monitoring_table_data = 3; |
| Metric metric = 4; |
| } |
| |
| enum MonitoringInfoLabels { |
| // TODO(ajamato): Rename all references from TRANSFORM to PTRANSFORM |
| |
| // The values used for TRANSFORM, PCOLLECTION, WINDOWING_STRATEGY |
| // CODER, ENVIRONMENT, etc. must always match the keys used to |
| // refer to them in the ProcessBundleDescriptor. |
| TRANSFORM = 0 [(label_props) = { name: "PTRANSFORM" }]; |
| PCOLLECTION = 1 [(label_props) = { name: "PCOLLECTION" }]; |
| WINDOWING_STRATEGY = 2 [(label_props) = { name: "WINDOWING_STRATEGY" }]; |
| CODER = 3 [(label_props) = { name: "CODER" }]; |
| ENVIRONMENT = 4 [(label_props) = { name: "ENVIRONMENT" }]; |
| NAMESPACE = 5 [(label_props) = { name: "NAMESPACE" }]; |
| NAME = 6 [(label_props) = { name: "NAME" }]; |
| } |
| // A set of key+value labels which define the scope of the metric. |
| // Either a well defined entity id for matching the enum names in |
| // the MonitoringInfoLabels enum or any arbitrary label |
| // set by a custom metric or user metric. |
| // A monitoring system is expected to be able to aggregate the metrics |
| // together for all updates having the same URN and labels. Some systems such |
| // as Stackdriver will be able to aggregate the metrics using a subset of the |
| // provided labels |
| map<string, string> labels = 5; |
| |
| // The walltime of the most recent update. |
| // Useful for aggregation for latest types such as LatestInt64. |
| google.protobuf.Timestamp timestamp = 6; |
| } |
| |
| message MonitoringInfoTypeUrns { |
| enum Enum { |
| SUM_INT64_TYPE = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) = |
| "beam:metrics:sum_int_64"]; |
| |
| DISTRIBUTION_INT64_TYPE = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) = |
| "beam:metrics:distribution_int_64"]; |
| |
| LATEST_INT64_TYPE = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) = |
| "beam:metrics:latest_int_64"]; |
| } |
| } |
| |
| message Metric { |
| // (Required) The data for this metric. |
| oneof data { |
| CounterData counter_data = 1; |
| DistributionData distribution_data = 2; |
| ExtremaData extrema_data = 3; |
| } |
| } |
| |
| // Data associated with a Counter or Gauge metric. |
| // This is designed to be compatible with metric collection |
| // systems such as DropWizard. |
| message CounterData { |
| oneof value { |
| int64 int64_value = 1; |
| double double_value = 2; |
| string string_value = 3; |
| } |
| } |
| |
| // Extrema messages are used for calculating |
| // Top-N/Bottom-N metrics. |
| message ExtremaData { |
| oneof extrema { |
| IntExtremaData int_extrema_data = 1; |
| DoubleExtremaData double_extrema_data = 2; |
| } |
| } |
| |
| message IntExtremaData { |
| repeated int64 int_values = 1; |
| } |
| |
| message DoubleExtremaData { |
| repeated double double_values = 2; |
| } |
| |
| // Data associated with a distribution metric. |
| // This is based off of the current DistributionData metric. |
| // This is not a stackdriver or dropwizard compatible |
| // style of distribution metric. |
| message DistributionData { |
| oneof distribution { |
| IntDistributionData int_distribution_data = 1; |
| DoubleDistributionData double_distribution_data = 2; |
| } |
| } |
| |
| message IntDistributionData { |
| int64 count = 1; |
| int64 sum = 2; |
| int64 min = 3; |
| int64 max = 4; |
| } |
| |
| message DoubleDistributionData { |
| int64 count = 1; |
| double sum = 2; |
| double min = 3; |
| double max = 4; |
| } |
| |
| // General MonitoredState information which contains |
| // structured information which does not fit into a typical |
| // metric format. For example, a table of important files |
| // and metadata which an I/O source is reading. |
| // Note: Since MonitoredState is designed to be |
| // customizable, and allow engines to aggregate these |
| // metrics in custom ways. |
| // Engines can use custom aggregation functions for specific URNs |
| // by inspecting the column values. |
| // An SDK should always report its current state, that is all |
| // relevant MonitoredState for its PTransform at the current moment |
| // and this should be kept small. |
| // For example, an SDK can emit the oldest three files which |
| // have been waiting for data for over 1 hour. |
| // If an engine supports the URN with a custom aggregation then |
| // it can filter these and keep only the Top-3 rows based on |
| // how long the files have been waiting for data. |
| // Otherwise an engine can ignore the MonitoringTableData |
| // or union all the rows together into one large table and display |
| // them in a UI. |
| message MonitoringTableData { |
| message MonitoringColumnValue { |
| oneof value { |
| int64 int64_value = 1; |
| double double_value = 2; |
| string string_value = 3; |
| google.protobuf.Timestamp timestamp = 4; |
| } |
| } |
| |
| message MonitoringRow { |
| repeated MonitoringColumnValue values = 1; |
| } |
| |
| // The number of column names must match the |
| // number of values in each MonitoringRow. |
| repeated string column_names = 1; |
| repeated MonitoringRow row_data = 2; |
| } |
| |