blob: 02fd5ae7884d0dcfcca42ed30db42ac98de10bde [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Protocol Buffers for metrics classes, used in the Fn API, Job API, and by SDKs.
*/
syntax = "proto3";
package org.apache.beam.model.pipeline.v1;
option go_package = "pipeline_v1";
option java_package = "org.apache.beam.model.pipeline.v1";
option java_outer_classname = "MetricsApi";
import "beam_runner_api.proto";
import "google/protobuf/descriptor.proto";
import "google/protobuf/timestamp.proto";
// A specification containing required set of fields and labels required
// to be set on a MonitoringInfo for the specific URN for SDK->RunnerHarness
// ProcessBundleResponse reporting.
message MonitoringInfoSpec {
string urn = 1;
string type_urn = 2;
// The list of required
repeated string required_labels = 3;
// Extra non functional parts of the spec for descriptive purposes.
// i.e. description, units, etc.
repeated Annotation annotations = 4;
}
// The key name and value string of MonitoringInfo annotations.
message Annotation {
string key = 1;
string value = 2;
}
// Populated MonitoringInfoSpecs for specific URNs.
// Indicating the required fields to be set.
// SDKs and RunnerHarnesses can load these instances into memory and write a
// validator or code generator to assist with populating and validating
// MonitoringInfo protos.
message MonitoringInfoSpecs {
enum Enum {
// TODO(BEAM-6926): Add the PTRANSFORM name as a required label after
// upgrading the python SDK.
USER_COUNTER = 0 [(monitoring_info_spec) = {
urn: "beam:metric:user",
type_urn: "beam:metrics:sum_int_64",
required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
annotations: [{
key: "description",
value: "URN utilized to report user numeric counters."
}]
}];
ELEMENT_COUNT = 1 [(monitoring_info_spec) = {
urn: "beam:metric:element_count:v1",
type_urn: "beam:metrics:sum_int_64",
required_labels: [ "PCOLLECTION" ],
annotations: [ {
key: "description",
value: "The total elements output to a Pcollection by a PTransform."
} ]
}];
SAMPLED_BYTE_SIZE = 7 [(monitoring_info_spec) = {
urn: "beam:metric:sampled_byte_size:v1",
type_urn: "beam:metrics:distribution_int_64",
required_labels: [ "PCOLLECTION" ],
annotations: [ {
key: "description",
value: "The total byte size and count of a sampled "
" set (or all) of elements in the pcollection. Sampling is used "
" because calculating the byte count involves serializing the "
" elements which is CPU intensive."
} ]
}];
START_BUNDLE_MSECS = 2 [(monitoring_info_spec) = {
urn: "beam:metric:pardo_execution_time:start_bundle_msecs:v1",
type_urn: "beam:metrics:sum_int_64",
required_labels: [ "PTRANSFORM" ],
annotations: [ {
key: "description",
value: "The total estimated execution time of the start bundle"
"function in a pardo"
} ]
}];
PROCESS_BUNDLE_MSECS = 3 [(monitoring_info_spec) = {
urn: "beam:metric:pardo_execution_time:process_bundle_msecs:v1",
type_urn: "beam:metrics:sum_int_64",
required_labels: [ "PTRANSFORM" ],
annotations: [ {
key: "description",
value: "The total estimated execution time of the process bundle"
"function in a pardo"
} ]
}];
FINISH_BUNDLE_MSECS = 4 [(monitoring_info_spec) = {
urn: "beam:metric:pardo_execution_time:finish_bundle_msecs:v1",
type_urn: "beam:metrics:sum_int_64",
required_labels: [ "PTRANSFORM" ],
annotations: [ {
key: "description",
value: "The total estimated execution time of the finish bundle "
"function in a pardo"
} ]
}];
TOTAL_MSECS = 5 [(monitoring_info_spec) = {
urn: "beam:metric:ptransform_execution_time:total_msecs:v1",
type_urn: "beam:metrics:sum_int_64",
required_labels: [ "PTRANSFORM" ],
annotations: [ {
key: "description",
value: "The total estimated execution time of the ptransform"
} ]
}];
// TODO(BEAM-6926): Add the PTRANSFORM name as a required label after
// upgrading the python SDK.
USER_DISTRIBUTION_COUNTER = 6 [(monitoring_info_spec) = {
urn: "beam:metric:user_distribution",
type_urn: "beam:metrics:distribution_int_64",
required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
annotations: [{
key: "description",
value: "URN utilized to report user distribution counters."
}]
}];
}
}
// A set of properties for the MonitoringInfoLabel, this is useful to obtain
// the proper label string for the MonitoringInfoLabel.
message MonitoringInfoLabelProps {
// The label key to use in the MonitoringInfo labels map.
string name = 1;
}
// Enum extension to store MonitoringInfo related
// specifications, constants, etc.
extend google.protobuf.EnumValueOptions {
MonitoringInfoLabelProps label_props = 127337796; // From: commit 0x7970544.
// Enum extension to store the MonitoringInfoSpecs.
MonitoringInfoSpec monitoring_info_spec = 207174266;
}
message MonitoringInfo {
// The name defining the metric or monitored state.
string urn = 1;
// This is specified as a URN that implies:
// A message class: (Distribution, Counter, Extrema, MonitoringDataTable).
// Sub types like field formats - int64, double, string.
// Aggregation methods - SUM, LATEST, TOP-N, BOTTOM-N, DISTRIBUTION
// valid values are:
// beam:metrics:[sum_int_64|latest_int_64|top_n_int_64|bottom_n_int_64|
// sum_double|latest_double|top_n_double|bottom_n_double|
// distribution_int_64|distribution_double|monitoring_data_table
string type = 2;
// The Metric or monitored state.
oneof data {
MonitoringTableData monitoring_table_data = 3;
Metric metric = 4;
}
enum MonitoringInfoLabels {
// TODO(ajamato): Rename all references from TRANSFORM to PTRANSFORM
// The values used for TRANSFORM, PCOLLECTION, WINDOWING_STRATEGY
// CODER, ENVIRONMENT, etc. must always match the keys used to
// refer to them in the ProcessBundleDescriptor.
TRANSFORM = 0 [(label_props) = { name: "PTRANSFORM" }];
PCOLLECTION = 1 [(label_props) = { name: "PCOLLECTION" }];
WINDOWING_STRATEGY = 2 [(label_props) = { name: "WINDOWING_STRATEGY" }];
CODER = 3 [(label_props) = { name: "CODER" }];
ENVIRONMENT = 4 [(label_props) = { name: "ENVIRONMENT" }];
NAMESPACE = 5 [(label_props) = { name: "NAMESPACE" }];
NAME = 6 [(label_props) = { name: "NAME" }];
}
// A set of key+value labels which define the scope of the metric.
// Either a well defined entity id for matching the enum names in
// the MonitoringInfoLabels enum or any arbitrary label
// set by a custom metric or user metric.
// A monitoring system is expected to be able to aggregate the metrics
// together for all updates having the same URN and labels. Some systems such
// as Stackdriver will be able to aggregate the metrics using a subset of the
// provided labels
map<string, string> labels = 5;
// The walltime of the most recent update.
// Useful for aggregation for latest types such as LatestInt64.
google.protobuf.Timestamp timestamp = 6;
}
message MonitoringInfoTypeUrns {
enum Enum {
SUM_INT64_TYPE = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:metrics:sum_int_64"];
DISTRIBUTION_INT64_TYPE = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:metrics:distribution_int_64"];
LATEST_INT64_TYPE = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:metrics:latest_int_64"];
}
}
message Metric {
// (Required) The data for this metric.
oneof data {
CounterData counter_data = 1;
DistributionData distribution_data = 2;
ExtremaData extrema_data = 3;
}
}
// Data associated with a Counter or Gauge metric.
// This is designed to be compatible with metric collection
// systems such as DropWizard.
message CounterData {
oneof value {
int64 int64_value = 1;
double double_value = 2;
string string_value = 3;
}
}
// Extrema messages are used for calculating
// Top-N/Bottom-N metrics.
message ExtremaData {
oneof extrema {
IntExtremaData int_extrema_data = 1;
DoubleExtremaData double_extrema_data = 2;
}
}
message IntExtremaData {
repeated int64 int_values = 1;
}
message DoubleExtremaData {
repeated double double_values = 2;
}
// Data associated with a distribution metric.
// This is based off of the current DistributionData metric.
// This is not a stackdriver or dropwizard compatible
// style of distribution metric.
message DistributionData {
oneof distribution {
IntDistributionData int_distribution_data = 1;
DoubleDistributionData double_distribution_data = 2;
}
}
message IntDistributionData {
int64 count = 1;
int64 sum = 2;
int64 min = 3;
int64 max = 4;
}
message DoubleDistributionData {
int64 count = 1;
double sum = 2;
double min = 3;
double max = 4;
}
// General MonitoredState information which contains
// structured information which does not fit into a typical
// metric format. For example, a table of important files
// and metadata which an I/O source is reading.
// Note: Since MonitoredState is designed to be
// customizable, and allow engines to aggregate these
// metrics in custom ways.
// Engines can use custom aggregation functions for specific URNs
// by inspecting the column values.
// An SDK should always report its current state, that is all
// relevant MonitoredState for its PTransform at the current moment
// and this should be kept small.
// For example, an SDK can emit the oldest three files which
// have been waiting for data for over 1 hour.
// If an engine supports the URN with a custom aggregation then
// it can filter these and keep only the Top-3 rows based on
// how long the files have been waiting for data.
// Otherwise an engine can ignore the MonitoringTableData
// or union all the rows together into one large table and display
// them in a UI.
message MonitoringTableData {
message MonitoringColumnValue {
oneof value {
int64 int64_value = 1;
double double_value = 2;
string string_value = 3;
google.protobuf.Timestamp timestamp = 4;
}
}
message MonitoringRow {
repeated MonitoringColumnValue values = 1;
}
// The number of column names must match the
// number of values in each MonitoringRow.
repeated string column_names = 1;
repeated MonitoringRow row_data = 2;
}