blob: a21f1093bbfa14a4859dd9999c559853cd773667 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
// Package metrics implements the Beam metrics API, described at
// Metrics in the Beam model are uniquely identified by a namespace, a name,
// and the PTransform context in which they are used. Further, they are
// reported as a delta against the bundle being processed, so that overcounting
// doesn't occur if a bundle needs to be retried. Each metric is scoped to
// their bundle, and ptransform.
// Cells (or metric cells) are defined for each Beam model metric
// type, and the serve as concurrency safe storage of a given metric's values.
// Proxys are exported values representing the metric, for use in user
// ptransform code. They don't retain their cells, since they don't have
// the context to be able to store them for export back to the pipeline runner.
// Metric cells aren't initialized until their first mutation, which
// follows from the Beam model design, where metrics are only sent for a bundle
// if they have changed. This is particularly convenient for distributions which
// means their min and max fields can be set to the first value on creation
// rather than have some marker of uninitialized state, which would otherwise
// need to be checked for on every update.
// Metric values are implemented as lightweight proxies of the user provided
// namespace and name. This allows them to be declared globally, and used in
// any ParDo. Further, as per the design, they can be declared dynamically
// at runtime.
// To handle reporting deltas on the metrics by bundle, metrics
// are keyed by bundleID,PTransformID,namespace, and name, so metrics that
// are identical except for bundles are treated as distinct, effectively
// providing per bundle deltas, since a new value cell is used per bundle.
package metrics
import (
// Metric cells are named and scoped by ptransform, and bundle,
// the latter two of which are only known at runtime. We propagate
// the PTransformID and BundleID via a context.Context. Consequently
// using metrics requires the PTransform have a context.Context
// argument.
// perBundle is a struct that retains per transform countersets.
// TODO(lostluck): Migrate the exec package to use these to extract
// metric data for export to runner, rather than the global store.
type perBundle struct {
mu sync.Mutex
css []*ptCounterSet
type nameHash uint64
// ptCounterSet is the internal tracking struct for a single ptransform
// in a single bundle for all counter types.
type ptCounterSet struct {
// We store the user path access to the cells in metric type segregated
// maps. At present, caching the name hash,
counters map[nameHash]*counter
distributions map[nameHash]*distribution
gauges map[nameHash]*gauge
type ctxKey string
const (
bundleKey ctxKey = "beam:bundle"
ptransformKey ctxKey = "beam:ptransform"
counterSetKey ctxKey = "beam:counterset"
// beamCtx is a caching context for IDs necessary to place metric updates.
// Allocating contexts and searching for PTransformIDs for every element
// is expensive, so we avoid it if possible.
type beamCtx struct {
bundleID, ptransformID string
bs *perBundle
cs *ptCounterSet
// Value implements context.Value for beamCtx, and lifts the
// values for metrics keys for value for faster lookups.
func (ctx *beamCtx) Value(key interface{}) interface{} {
switch key {
case counterSetKey:
if ctx.cs == nil {
if cs := ctx.Context.Value(key); cs != nil {
ctx.cs = cs.(*ptCounterSet)
} else {
return nil
return ctx.cs
case bundleKey:
if ctx.bundleID == "" {
if id := ctx.Context.Value(key); id != nil {
ctx.bundleID = id.(string)
} else {
return nil
return ctx.bundleID
case ptransformKey:
if ctx.ptransformID == "" {
if id := ctx.Context.Value(key); id != nil {
ctx.ptransformID = id.(string)
} else {
return nil
return ctx.ptransformID
return ctx.Context.Value(key)
func (ctx *beamCtx) String() string {
return fmt.Sprintf("beamCtx[%s;%s]", ctx.bundleID, ctx.ptransformID)
// SetBundleID sets the id of the current Bundle.
func SetBundleID(ctx context.Context, id string) context.Context {
// Checking for *beamCtx is an optimization, so we don't dig deeply
// for ids if not necessary.
if bctx, ok := ctx.(*beamCtx); ok {
return &beamCtx{Context: bctx.Context, bundleID: id, bs: &perBundle{}, ptransformID: bctx.ptransformID}
return &beamCtx{Context: ctx, bundleID: id, bs: &perBundle{}}
// SetPTransformID sets the id of the current PTransform.
// Must only be called on a context returened by SetBundleID.
func SetPTransformID(ctx context.Context, id string) context.Context {
// Checking for *beamCtx is an optimization, so we don't dig deeply
// for ids if not necessary.
if bctx, ok := ctx.(*beamCtx); ok {
return &beamCtx{Context: bctx.Context, bundleID: bctx.bundleID, bs:, ptransformID: id}
panic(fmt.Sprintf("SetPTransformID called before SetBundleID for %v", id))
return nil // never runs.
const (
bundleIDUnset = "(bundle id unset)"
ptransformIDUnset = "(ptransform id unset)"
func getContextKey(ctx context.Context, n name) key {
key := key{name: n, bundle: bundleIDUnset, ptransform: ptransformIDUnset}
if id := ctx.Value(bundleKey); id != nil {
key.bundle = id.(string)
if id := ctx.Value(ptransformKey); id != nil {
key.ptransform = id.(string)
return key
func getCounterSet(ctx context.Context) *ptCounterSet {
if id := ctx.Value(counterSetKey); id != nil {
return id.(*ptCounterSet)
// It's not set anywhere and wasn't hoisted, so create it.
if bctx, ok := ctx.(*beamCtx); ok {
cs := &ptCounterSet{
counters: make(map[nameHash]*counter),
distributions: make(map[nameHash]*distribution),
gauges: make(map[nameHash]*gauge),
} = append(, cs)
bctx.cs = cs
return cs
panic("counterSet missing, beam isn't set up properly.")
return nil // never runs.
type kind uint8
const (
kindUnknown kind = iota
func (t kind) String() string {
switch t {
case kindSumCounter:
return "Counter"
case kindDistribution:
return "Distribution"
case kindGauge:
return "Gauge"
panic(fmt.Sprintf("Unknown metric type value: %v", uint8(t)))
// userMetric knows how to convert it's value to a Metrics_User proto.
// TODO(lostluck): Move proto translation to the harness package to
// avoid the proto dependency outside of the harness.
type userMetric interface {
toProto() *fnexecution_v1.Metrics_User
kind() kind
// name is a pair of strings identifying a specific metric.
type name struct {
namespace, name string
func (n name) String() string {
return fmt.Sprintf("%s.%s", n.namespace,
func newName(ns, n string) name {
if len(n) == 0 || len(ns) == 0 {
panic(fmt.Sprintf("namespace and name are required to be non-empty, got %q and %q", ns, n))
return name{namespace: ns, name: n}
// We hash the name to a uint64 so we avoid using go's native string hashing for
// every use of a metrics. uint64s have faster lookup than strings as a result.
// Collisions are possible, but statistically unlikely as namespaces and names
// are usually short enough to avoid this.
var (
hasherMu sync.Mutex
hasher = fnv.New64a()
func hashName(ns, n string) nameHash {
var buf [64]byte
b := buf[:]
hashString(ns, b)
hashString(n, b)
h := hasher.Sum64()
return nameHash(h)
// hashString hashes a string with the package level hasher
// and requires posession of the hasherMu lock. The byte
// slice is assumed to be backed by a [64]byte.
func hashString(s string, b []byte) {
l := len(s)
i := 0
for len(s)-i > 64 {
n := i + 64
copy(b, s[i:n])
ioutilx.WriteUnsafe(hasher, b)
i = n
n := l - i
copy(b, s[i:])
ioutilx.WriteUnsafe(hasher, b[:n])
type key struct {
name name
bundle, ptransform string
var (
// mu protects access to the global store
mu sync.RWMutex
// store is a map of BundleIDs to PtransformIDs to userMetrics.
// it permits us to extract metric protos for runners per data Bundle, and
// per PTransform.
// TODO(lostluck): Migrate exec/plan.go to manage it's own perBundle counter stores.
// Note: This is safe to use to read the metrics concurrently with user modification
// in bundles, as only initial use modifies this map, and only contains the resulting
// metrics.
store = make(map[string]map[string]map[name]userMetric)
// storeMetric stores a metric away on its first use so it may be retrieved later on.
// In the event of a name collision, storeMetric can panic, so it's prudent to release
// locks if they are no longer required.
func storeMetric(key key, m userMetric) {
defer mu.Unlock()
if _, ok := store[key.bundle]; !ok {
store[key.bundle] = make(map[string]map[name]userMetric)
if _, ok := store[key.bundle][key.ptransform]; !ok {
store[key.bundle][key.ptransform] = make(map[name]userMetric)
if ms, ok := store[key.bundle][key.ptransform][]; ok {
if ms.kind() != m.kind() {
panic(fmt.Sprintf("metric name %s being reused for a second metric in a single PTransform",
store[key.bundle][key.ptransform][] = m
// Counter is a simple counter for incrementing and decrementing a value.
type Counter struct {
name name
hash nameHash
func (m *Counter) String() string {
return fmt.Sprintf("Counter metric %s",
// NewCounter returns the Counter with the given namespace and name.
func NewCounter(ns, n string) *Counter {
return &Counter{
name: newName(ns, n),
hash: hashName(ns, n),
// Inc increments the counter within the given PTransform context by v.
func (m *Counter) Inc(ctx context.Context, v int64) {
cs := getCounterSet(ctx)
if c, ok := cs.counters[m.hash]; ok {
// We're the first to create this metric!
c := &counter{
value: v,
cs.counters[m.hash] = c
key := getContextKey(ctx,
storeMetric(key, c)
// Dec decrements the counter within the given PTransform context by v.
func (m *Counter) Dec(ctx context.Context, v int64) {
m.Inc(ctx, -v)
// counter is a metric cell for counter values.
type counter struct {
value int64
func (m *counter) inc(v int64) {
atomic.AddInt64(&m.value, v)
func (m *counter) String() string {
return fmt.Sprintf("value: %d", m.value)
// toProto returns a Metrics_User populated with the Data messages, but not the name. The
// caller needs to populate with the metric's name.
func (m *counter) toProto() *fnexecution_v1.Metrics_User {
return &fnexecution_v1.Metrics_User{
Data: &fnexecution_v1.Metrics_User_CounterData_{
CounterData: &fnexecution_v1.Metrics_User_CounterData{
Value: atomic.LoadInt64(&m.value),
func (m *counter) kind() kind {
return kindSumCounter
// Distribution is a simple distribution of values.
type Distribution struct {
name name
hash nameHash
func (m *Distribution) String() string {
return fmt.Sprintf("Distribution metric %s",
// NewDistribution returns the Distribution with the given namespace and name.
func NewDistribution(ns, n string) *Distribution {
return &Distribution{
name: newName(ns, n),
hash: hashName(ns, n),
// Update updates the distribution within the given PTransform context with v.
func (m *Distribution) Update(ctx context.Context, v int64) {
cs := getCounterSet(ctx)
if d, ok := cs.distributions[m.hash]; ok {
// We're the first to create this metric!
d := &distribution{
count: 1,
sum: v,
min: v,
max: v,
cs.distributions[m.hash] = d
key := getContextKey(ctx,
storeMetric(key, d)
// distribution is a metric cell for distribution values.
type distribution struct {
count, sum, min, max int64
mu sync.Mutex
func (m *distribution) update(v int64) {
if v < m.min {
m.min = v
if v > m.max {
m.max = v
m.sum += v
func (m *distribution) String() string {
return fmt.Sprintf("count: %d sum: %d min: %d max: %d", m.count, m.sum, m.min, m.max)
// toProto returns a Metrics_User populated with the Data messages, but not the name. The
// caller needs to populate with the metric's name.
func (m *distribution) toProto() *fnexecution_v1.Metrics_User {
return &fnexecution_v1.Metrics_User{
Data: &fnexecution_v1.Metrics_User_DistributionData_{
DistributionData: &fnexecution_v1.Metrics_User_DistributionData{
Count: m.count,
Sum: m.sum,
Min: m.min,
Max: m.max,
func (m *distribution) kind() kind {
return kindDistribution
// Gauge is a time, value pair metric.
type Gauge struct {
name name
hash nameHash
func (m *Gauge) String() string {
return fmt.Sprintf("Guage metric %s",
// NewGauge returns the Gauge with the given namespace and name.
func NewGauge(ns, n string) *Gauge {
return &Gauge{
name: newName(ns, n),
hash: hashName(ns, n),
// TODO(lostluck): 2018/03/05 Use a common internal beam now() instead, once that exists.
var now = time.Now
// Set sets the gauge to the given value, and associates it with the current time on the clock.
func (m *Gauge) Set(ctx context.Context, v int64) {
cs := getCounterSet(ctx)
if g, ok := cs.gauges[m.hash]; ok {
// We're the first to create this metric!
g := &gauge{
t: now(),
v: v,
cs.gauges[m.hash] = g
key := getContextKey(ctx,
storeMetric(key, g)
// gauge is a metric cell for gauge values.
type gauge struct {
mu sync.Mutex
t time.Time
v int64
func (m *gauge) set(v int64) {
m.t = now()
m.v = v
func (m *gauge) toProto() *fnexecution_v1.Metrics_User {
ts, err := ptypes.TimestampProto(m.t)
if err != nil {
return &fnexecution_v1.Metrics_User{
Data: &fnexecution_v1.Metrics_User_GaugeData_{
GaugeData: &fnexecution_v1.Metrics_User_GaugeData{
Value: m.v,
Timestamp: ts,
func (m *gauge) kind() kind {
return kindGauge
func (m *gauge) String() string {
return fmt.Sprintf("%v time: %s value: %d", m.kind(), m.t, m.v)
// ToProto exports all collected metrics for the given BundleID and PTransform ID pair.
func ToProto(b, pt string) []*fnexecution_v1.Metrics_User {
defer mu.RUnlock()
ps := store[b]
s := ps[pt]
var ret []*fnexecution_v1.Metrics_User
for n, m := range s {
p := m.toProto()
p.MetricName = &fnexecution_v1.Metrics_User_MetricName{
Namespace: n.namespace,
ret = append(ret, p)
return ret
// DumpToLog is a debugging function that outputs all metrics available locally to beam.Log.
func DumpToLog(ctx context.Context) {
dumpTo(func(format string, args ...interface{}) {
log.Errorf(ctx, format, args...)
// DumpToOut is a debugging function that outputs all metrics available locally to std out.
func DumpToOut() {
dumpTo(func(format string, args ...interface{}) {
fmt.Printf(format+"\n", args...)
func dumpTo(p func(format string, args ...interface{})) {
defer mu.RUnlock()
var bs []string
for b := range store {
bs = append(bs, b)
for _, b := range bs {
var pts []string
for pt := range store[b] {
pts = append(pts, pt)
for _, pt := range pts {
var ns []name
for n := range store[b][pt] {
ns = append(ns, n)
sort.Slice(ns, func(i, j int) bool {
if ns[i].namespace < ns[j].namespace {
return true
if ns[i].namespace == ns[j].namespace && ns[i].name < ns[j].name {
return true
return false
p("Bundle: %q - PTransformID: %q", b, pt)
for _, n := range ns {
m := store[b][pt][n]
p("\t%s - %s", n, m)
// Clear resets all storage associated with metrics for tests.
// Calling this in pipeline code leads to inaccurate metrics.
func Clear() {
store = make(map[string]map[string]map[name]userMetric)
// ClearBundleData removes stored references associated with a given bundle,
// so it can be garbage collected.
func ClearBundleData(b string) {
// No concurrency races since mu guards all access to store,
// and the metric cell sync.Maps are goroutine safe.
delete(store, b)