blob: c35a644da5d545e76254c6d3409224c8a2d0c3e3 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
* Protocol Buffers for metrics classes, used in the Fn API, Job API, and by SDKs.
syntax = "proto3";
package org.apache.beam.model.pipeline.v1;
option go_package = ";pipeline_v1";
option java_package = "org.apache.beam.model.pipeline.v1";
option java_outer_classname = "MetricsApi";
import "beam_runner_api.proto";
import "google/protobuf/descriptor.proto";
import "google/protobuf/timestamp.proto";
// A specification for describing a well known MonitoringInfo.
// All specifications are uniquely identified by the urn.
message MonitoringInfoSpec {
// Defines the semantic meaning of the metric or monitored state.
// See MonitoringInfoSpecs.Enum for the set of well known metrics/monitored
// state.
string urn = 1;
// Defines the required encoding and aggregation method for the payload.
// See MonitoringInfoTypeUrns.Enum for the set of well known types.
string type = 2;
// The list of required labels for the specified urn and type.
repeated string required_labels = 3;
// Extra non functional parts of the spec for descriptive purposes.
// i.e. description, units, etc.
repeated Annotation annotations = 4;
// The key name and value string of MonitoringInfo annotations.
message Annotation {
string key = 1;
string value = 2;
// A set of well known MonitoringInfo specifications.
message MonitoringInfoSpecs {
enum Enum {
// Represents an integer counter where values are summed across bundles.
USER_SUM_INT64 = 0 [(monitoring_info_spec) = {
urn: "beam:metric:user:sum_int64:v1",
type: "beam:metrics:sum_int64:v1",
required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
annotations: [{
key: "description",
value: "URN utilized to report user metric."
// Represents a double counter where values are summed across bundles.
USER_SUM_DOUBLE = 1 [(monitoring_info_spec) = {
urn: "beam:metric:user:sum_double:v1",
type: "beam:metrics:sum_double:v1",
required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
annotations: [{
key: "description",
value: "URN utilized to report user metric."
// Represents a distribution of an integer value where:
// - count: represents the number of values seen across all bundles
// - sum: represents the total of the value across all bundles
// - min: represents the smallest value seen across all bundles
// - max: represents the largest value seen across all bundles
USER_DISTRIBUTION_INT64 = 2 [(monitoring_info_spec) = {
urn: "beam:metric:user:distribution_int64:v1",
type: "beam:metrics:distribution_int64:v1",
required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
annotations: [{
key: "description",
value: "URN utilized to report user metric."
// Represents a distribution of a double value where:
// - count: represents the number of values seen across all bundles
// - sum: represents the total of the value across all bundles
// - min: represents the smallest value seen across all bundles
// - max: represents the largest value seen across all bundles
USER_DISTRIBUTION_DOUBLE = 3 [(monitoring_info_spec) = {
urn: "beam:metric:user:distribution_double:v1",
type: "beam:metrics:distribution_double:v1",
required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
annotations: [{
key: "description",
value: "URN utilized to report user metric."
// Represents the latest seen integer value. The timestamp is used to
// provide an "ordering" over multiple values to determine which is the
// latest.
USER_LATEST_INT64 = 4 [(monitoring_info_spec) = {
urn: "beam:metric:user:latest_int64:v1",
type: "beam:metrics:latest_int64:v1",
required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
annotations: [{
key: "description",
value: "URN utilized to report user metric."
// Represents the latest seen double value. The timestamp is used to
// provide an "ordering" over multiple values to determine which is the
// latest.
USER_LATEST_DOUBLE = 5 [(monitoring_info_spec) = {
urn: "beam:metric:user:latest_double:v1",
type: "beam:metrics:latest_double:v1",
required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
annotations: [{
key: "description",
value: "URN utilized to report user metric."
// Represents the largest set of integer values seen across bundles.
USER_TOP_N_INT64 = 6 [(monitoring_info_spec) = {
urn: "beam:metric:user:top_n_int64:v1",
type: "beam:metrics:top_n_int64:v1",
required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
annotations: [{
key: "description",
value: "URN utilized to report user metric."
// Represents the largest set of double values seen across bundles.
USER_TOP_N_DOUBLE = 7 [(monitoring_info_spec) = {
urn: "beam:metric:user:top_n_double:v1",
type: "beam:metrics:top_n_double:v1",
required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
annotations: [{
key: "description",
value: "URN utilized to report user metric."
// Represents the smallest set of integer values seen across bundles.
USER_BOTTOM_N_INT64 = 8 [(monitoring_info_spec) = {
urn: "beam:metric:user:bottom_n_int64:v1",
type: "beam:metrics:bottom_n_int64:v1",
required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
annotations: [{
key: "description",
value: "URN utilized to report user metric."
// Represents the smallest set of double values seen across bundles.
USER_BOTTOM_N_DOUBLE = 9 [(monitoring_info_spec) = {
urn: "beam:metric:user:bottom_n_double:v1",
type: "beam:metrics:bottom_n_double:v1",
required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
annotations: [{
key: "description",
value: "URN utilized to report user metric."
// General monitored state information which contains structured information
// which does not fit into a typical metric format. See MonitoringTableData
// for more details.
// TODO(BEAM-9617): Support monitored state.
// USER_MONITORING_TABLE = XX [(monitoring_info_spec) = {
// urn: "beam:metric:user:v1",
// type: "beam:metrics:monitoring_table:v1",
// required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
// annotations: [{
// key: "description",
// value: "URN utilized to report user monitoring data."
// }]
// }];
ELEMENT_COUNT = 10 [(monitoring_info_spec) = {
urn: "beam:metric:element_count:v1",
type: "beam:metrics:sum_int64:v1",
required_labels: [ "PCOLLECTION" ],
annotations: [ {
key: "description",
value: "The total elements output to a Pcollection by a PTransform."
} ]
SAMPLED_BYTE_SIZE = 11 [(monitoring_info_spec) = {
urn: "beam:metric:sampled_byte_size:v1",
type: "beam:metrics:distribution_int64:v1",
required_labels: [ "PCOLLECTION" ],
annotations: [ {
key: "description",
value: "The total byte size and count of a sampled "
" set (or all) of elements in the pcollection. Sampling is used "
" because calculating the byte count involves serializing the "
" elements which is CPU intensive."
} ]
START_BUNDLE_MSECS = 12 [(monitoring_info_spec) = {
urn: "beam:metric:pardo_execution_time:start_bundle_msecs:v1",
type: "beam:metrics:sum_int64:v1",
required_labels: [ "PTRANSFORM" ],
annotations: [ {
key: "description",
value: "The total estimated execution time of the start bundle"
"function in a pardo"
} ]
PROCESS_BUNDLE_MSECS = 13 [(monitoring_info_spec) = {
urn: "beam:metric:pardo_execution_time:process_bundle_msecs:v1",
type: "beam:metrics:sum_int64:v1",
required_labels: [ "PTRANSFORM" ],
annotations: [ {
key: "description",
value: "The total estimated execution time of the process bundle"
"function in a pardo"
} ]
FINISH_BUNDLE_MSECS = 14 [(monitoring_info_spec) = {
urn: "beam:metric:pardo_execution_time:finish_bundle_msecs:v1",
type: "beam:metrics:sum_int64:v1",
required_labels: [ "PTRANSFORM" ],
annotations: [ {
key: "description",
value: "The total estimated execution time of the finish bundle "
"function in a pardo"
} ]
TOTAL_MSECS = 15 [(monitoring_info_spec) = {
urn: "beam:metric:ptransform_execution_time:total_msecs:v1",
type: "beam:metrics:sum_int64:v1",
required_labels: [ "PTRANSFORM" ],
annotations: [ {
key: "description",
value: "The total estimated execution time of the ptransform"
} ]
// All values reported across all beam:metric:ptransform_progress:.*:v1
// metrics are of the same magnitude.
WORK_REMAINING = 16 [(monitoring_info_spec) = {
urn: "beam:metric:ptransform_progress:remaining:v1",
type: "beam:metrics:progress:v1",
required_labels: [ "PTRANSFORM" ],
annotations: [{
key: "description",
value: "The remaining amount of work for each active element. Each active element represents an independent amount of work not shared with any other active element."
// All values reported across all beam:metric:ptransform_progress:.*:v1
// metrics are of the same magnitude.
WORK_COMPLETED = 17 [(monitoring_info_spec) = {
urn: "beam:metric:ptransform_progress:completed:v1",
type: "beam:metrics:progress:v1",
required_labels: [ "PTRANSFORM" ],
annotations: [{
key: "description",
value: "The remaining amount of work for each active element. Each active element represents an independent amount of work not shared with any other active element."
// The (0-based) index of the latest item processed from the data channel.
// This gives an indication of the SDKs progress through the data channel,
// and is a lower bound on where it is able to split.
// For an SDK that processes items sequentially, this is equivalently the
// number of items fully processed (or -1 if processing has not yet started).
DATA_CHANNEL_READ_INDEX = 18 [(monitoring_info_spec) = {
urn: "beam:metric:data_channel:read_index:v1",
type: "beam:metrics:sum_int64:v1",
required_labels: [ "PTRANSFORM" ],
annotations: [{
key: "description",
value: "The read index of the data channel."
API_REQUEST_COUNT = 19 [(monitoring_info_spec) = {
urn: "beam:metric:io:api_request_count:v1",
type: "beam:metrics:sum_int64:v1",
required_labels: [
annotations: [
key: "description",
value: "Request counts with status made to IO service APIs to batch read or write elements."
key: "process_metric", // Should be reported as a process metric
// instead of a bundle metric
value: "true"
API_REQUEST_LATENCIES = 20 [(monitoring_info_spec) = {
urn: "beam:metric:io:api_request_latencies:v1",
type: "beam:metrics:histogram_int64:v1",
required_labels: [
annotations: [
key: "description",
value: "Histogram counts for request latencies made to IO service APIs to batch read or write elements."
key: "units",
value: "Milliseconds"
key: "process_metric", // Should be reported as a process metric
// instead of a bundle metric
value: "true"
// A set of properties for the MonitoringInfoLabel, this is useful to obtain
// the proper label string for the MonitoringInfoLabel.
message MonitoringInfoLabelProps {
// The label key to use in the MonitoringInfo labels map.
string name = 1;
// Enum extension to store MonitoringInfo related
// specifications, constants, etc.
extend google.protobuf.EnumValueOptions {
MonitoringInfoLabelProps label_props = 127337796; // From: commit 0x7970544.
// Enum extension to store the MonitoringInfoSpecs.
MonitoringInfoSpec monitoring_info_spec = 207174266;
message MonitoringInfo {
// (Required) Defines the semantic meaning of the metric or monitored state.
// See MonitoringInfoSpecs.Enum for the set of well known metrics/monitored
// state.
string urn = 1;
// (Required) Defines the encoding and aggregation method for the payload.
// See MonitoringInfoTypeUrns.Enum for the set of well known types.
string type = 2;
// (Required) The metric or monitored state encoded as per the specification
// defined by the type.
bytes payload = 3;
enum MonitoringInfoLabels {
// CODER, ENVIRONMENT, etc. must always match the keys used to
// refer to them. For actively processed bundles, these should match the
// values within the ProcessBundleDescriptor. For job management APIs,
// these should match values within the original pipeline representation.
TRANSFORM = 0 [(label_props) = { name: "PTRANSFORM" }];
PCOLLECTION = 1 [(label_props) = { name: "PCOLLECTION" }];
WINDOWING_STRATEGY = 2 [(label_props) = { name: "WINDOWING_STRATEGY" }];
CODER = 3 [(label_props) = { name: "CODER" }];
ENVIRONMENT = 4 [(label_props) = { name: "ENVIRONMENT" }];
NAMESPACE = 5 [(label_props) = { name: "NAMESPACE" }];
NAME = 6 [(label_props) = { name: "NAME" }];
SERVICE = 7 [(label_props) = { name: "SERVICE" }];
METHOD = 8 [(label_props) = { name: "METHOD" }];
RESOURCE = 9 [(label_props) = { name: "RESOURCE" }];
STATUS = 10 [(label_props) = { name: "STATUS" }];
BIGQUERY_PROJECT_ID = 11 [(label_props) = { name: "BIGQUERY_PROJECT_ID" }];
BIGQUERY_DATASET = 12 [(label_props) = { name: "BIGQUERY_DATASET" }];
BIGQUERY_TABLE = 13 [(label_props) = { name: "BIGQUERY_TABLE" }];
BIGQUERY_VIEW = 14 [(label_props) = { name: "BIGQUERY_VIEW" }];
BIGQUERY_QUERY_NAME = 15 [(label_props) = { name: "BIGQUERY_QUERY_NAME" }];
// A set of key and value labels which define the scope of the metric. For
// well known URNs, the set of required labels is provided by the associated
// MonitoringInfoSpec.
// Either a well defined entity id for matching the enum names in
// the MonitoringInfoLabels enum or any arbitrary label
// set by a custom metric or user metric.
// A monitoring system is expected to be able to aggregate the metrics
// together for all updates having the same URN and labels. Some systems such
// as Stackdriver will be able to aggregate the metrics using a subset of the
// provided labels
map<string, string> labels = 4;
// This indicates the start of the time range over which this value was
// measured.
// This is needed by some external metric aggregation services
// to indicate when the reporter of the metric first began collecting the
// cumulative value for the timeseries.
// If the SDK Harness restarts, it should reset the start_time, and reset
// the collection of cumulative metrics (i.e. start to count again from 0).
// HarnessMonitoringInfos should set this start_time once, when the
// MonitoringInfo is first reported.
// ProcessBundle MonitoringInfos should set a start_time for each bundle.
google.protobuf.Timestamp start_time = 5;
// A set of well known URNs that specify the encoding and aggregation method.
message MonitoringInfoTypeUrns {
enum Enum {
// Represents an integer counter where values are summed across bundles.
// Encoding: <value>
// - value: beam:coder:varint:v1
SUM_INT64_TYPE = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) =
// Represents a double counter where values are summed across bundles.
// Encoding: <value>
// value: beam:coder:double:v1
SUM_DOUBLE_TYPE = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
// Represents a distribution of an integer value where:
// - count: represents the number of values seen across all bundles
// - sum: represents the total of the value across all bundles
// - min: represents the smallest value seen across all bundles
// - max: represents the largest value seen across all bundles
// Encoding: <count><sum><min><max>
// - count: beam:coder:varint:v1
// - sum: beam:coder:varint:v1
// - min: beam:coder:varint:v1
// - max: beam:coder:varint:v1
DISTRIBUTION_INT64_TYPE = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
// Represents a distribution of a double value where:
// - count: represents the number of values seen across all bundles
// - sum: represents the total of the value across all bundles
// - min: represents the smallest value seen across all bundles
// - max: represents the largest value seen across all bundles
// Encoding: <count><sum><min><max>
// - count: beam:coder:varint:v1
// - sum: beam:coder:double:v1
// - min: beam:coder:double:v1
// - max: beam:coder:double:v1
DISTRIBUTION_DOUBLE_TYPE = 3 [(org.apache.beam.model.pipeline.v1.beam_urn) =
// Represents the latest seen integer value. The timestamp is used to
// provide an "ordering" over multiple values to determine which is the
// latest.
// Encoding: <timestamp><value>
// - timestamp: beam:coder:varint:v1 (milliseconds since epoch)
// - value: beam:coder:varint:v1
LATEST_INT64_TYPE = 4 [(org.apache.beam.model.pipeline.v1.beam_urn) =
// Represents the latest seen double value. The timestamp is used to
// provide an "ordering" over multiple values to determine which is the
// latest.
// Encoding: <timestamp><value>
// - timestamp: beam:coder:varint:v1 (milliseconds since epoch)
// - value: beam:coder:double:v1
LATEST_DOUBLE_TYPE = 5 [(org.apache.beam.model.pipeline.v1.beam_urn) =
// Represents the largest set of integer values seen across bundles.
// Encoding: <iter><value1><value2>...<valueN></iter>
// - iter: beam:coder:iterable:v1
// - valueX: beam:coder:varint:v1
TOP_N_INT64_TYPE = 6 [(org.apache.beam.model.pipeline.v1.beam_urn) =
// Represents the largest set of double values seen across bundles.
// Encoding: <iter><value1><value2>...<valueN></iter>
// - iter: beam:coder:iterable:v1
// - valueX: beam:coder<beam:coder:double:v1
TOP_N_DOUBLE_TYPE = 7 [(org.apache.beam.model.pipeline.v1.beam_urn) =
// Represents the smallest set of integer values seen across bundles.
// Encoding: <iter><value1><value2>...<valueN></iter>
// - iter: beam:coder:iterable:v1
// - valueX: beam:coder:varint:v1
BOTTOM_N_INT64_TYPE = 8 [(org.apache.beam.model.pipeline.v1.beam_urn) =
// Represents the smallest set of double values seen across bundles.
// Encoding: <iter><value1><value2>...<valueN></iter>
// - iter: beam:coder:iterable:v1
// - valueX: beam:coder:double:v1
BOTTOM_N_DOUBLE_TYPE = 9 [(org.apache.beam.model.pipeline.v1.beam_urn) =
// Encoding: <iter><value1><value2>...<valueN></iter>
// - iter: beam:coder:iterable:v1
// - valueX: beam:coder:double:v1
PROGRESS_TYPE = 10 [(org.apache.beam.model.pipeline.v1.beam_urn) =
// General monitored state information which contains structured information
// which does not fit into a typical metric format. See MonitoringTableData
// for more details.
// TODO(BEAM-9617): Support monitored state.
// Encoding: MonitoringTableData proto encoded as bytes
// MONITORING_TABLE_TYPE = XX [(org.apache.beam.model.pipeline.v1.beam_urn) =
// "beam:metrics:monitoring_table:v1"];
// General monitored state information which contains structured information
// which does not fit into a typical metric format.
// An SDK should always report its current state, that is all relevant monitored
// state for its PTransform at the current moment and this should be kept small.
// For example, a table of important files and metadata which an I/O source is
// reading. An SDK can emit the oldest three files which have been waiting for
// data for over 1 hour. If an engine supports the URN with a custom aggregation
// then it can filter these and keep only the Top-3 rows based on how long the
// files have been waiting for data. Otherwise an engine can ignore the
// MonitoringTableData or union all the rows together into one large table and
// display them in a UI.
// Note: Since MonitoredState is designed to be customizable, and allow engines
// to aggregate these metrics in custom ways. Engines can use custom aggregation
// functions for specific URNs by inspecting the column values.
// TODO(BEAM-9617): Support monitored state.
// message MonitoringTableData {
// message MonitoringColumnValue {
// oneof value {
// int64 int64_value = 1;
// double double_value = 2;
// string string_value = 3;
// google.protobuf.Timestamp timestamp = 4;
// }
// }
// message MonitoringRow {
// repeated MonitoringColumnValue values = 1;
// }
// // The number of column names must match the
// // number of values in each MonitoringRow.
// repeated string column_names = 1;
// repeated MonitoringRow row_data = 2;
// }