| // Licensed to the Apache Software Foundation (ASF) under one or more |
| // contributor license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright ownership. |
| // The ASF licenses this file to You under the Apache License, Version 2.0 |
| // (the "License"); you may not use this file except in compliance with |
| // the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| // Package metrics implements the Beam metrics API, described at |
| // http://s.apache.org/beam-metrics-api |
| // |
| // Metrics in the Beam model are uniquely identified by a namespace, a name, |
| // and the PTransform context in which they are used. Further, they are |
| // reported as a delta against the bundle being processed, so that overcounting |
| // doesn't occur if a bundle needs to be retried. Each metric is scoped to |
| // their bundle, and ptransform. |
| // |
| // Cells (or metric cells) are defined for each Beam model metric |
| // type, and the serve as concurrency safe storage of a given metric's values. |
| // Proxys are exported values representing the metric, for use in user |
| // ptransform code. They don't retain their cells, since they don't have |
| // the context to be able to store them for export back to the pipeline runner. |
| // |
| // Metric cells aren't initialized until their first mutation, which |
| // follows from the Beam model design, where metrics are only sent for a bundle |
| // if they have changed. This is particularly convenient for distributions which |
| // means their min and max fields can be set to the first value on creation |
| // rather than have some marker of uninitialized state, which would otherwise |
| // need to be checked for on every update. |
| // |
| // Metric values are implemented as lightweight proxies of the user provided |
| // namespace and name. This allows them to be declared globally, and used in |
| // any ParDo. Further, as per the design, they can be declared dynamically |
| // at runtime. |
| // |
| // To handle reporting deltas on the metrics by bundle, metrics |
| // are keyed by bundleID,PTransformID,namespace, and name, so metrics that |
| // are identical except for bundles are treated as distinct, effectively |
| // providing per bundle deltas, since a new value cell is used per bundle. |
| package metrics |
| |
| import ( |
| "context" |
| "fmt" |
| "sort" |
| "sync" |
| "time" |
| |
| "github.com/apache/beam/sdks/go/pkg/beam/log" |
| "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1" |
| "github.com/golang/protobuf/ptypes" |
| ) |
| |
| // Metric cells are named and scoped by ptransform, and bundle, |
| // the latter two of which are only known at runtime. We propagate |
| // the PTransformID and BundleID via a context.Context. Consequently |
| // using metrics requires the PTransform have a context.Context |
| // argument. |
| |
| type ctxKey string |
| |
| const bundleKey ctxKey = "beam:bundle" |
| const ptransformKey ctxKey = "beam:ptransform" |
| |
| // beamCtx is a caching context for IDs necessary to place metric updates. |
| // Allocating contexts and searching for PTransformIDs for every element |
| // is expensive, so we avoid it if possible. |
| type beamCtx struct { |
| context.Context |
| bundleID, ptransformID string |
| } |
| |
| // Value lifts the beam contLift the keys value for faster lookups when not available. |
| func (ctx *beamCtx) Value(key interface{}) interface{} { |
| switch key { |
| case bundleKey: |
| if ctx.bundleID == "" { |
| if id := ctx.Context.Value(key); id != nil { |
| ctx.bundleID = id.(string) |
| } else { |
| return nil |
| } |
| } |
| return ctx.bundleID |
| case ptransformKey: |
| if ctx.ptransformID == "" { |
| if id := ctx.Context.Value(key); id != nil { |
| ctx.ptransformID = id.(string) |
| } else { |
| return nil |
| } |
| } |
| return ctx.ptransformID |
| } |
| return ctx.Context.Value(key) |
| } |
| |
| // SetBundleID sets the id of the current Bundle. |
| func SetBundleID(ctx context.Context, id string) context.Context { |
| // Checking for *beamCtx is an optimization, so we don't dig deeply |
| // for ids if not necessary. |
| if bctx, ok := ctx.(*beamCtx); ok { |
| return &beamCtx{Context: bctx.Context, bundleID: id, ptransformID: bctx.ptransformID} |
| } |
| return &beamCtx{Context: ctx, bundleID: id} |
| } |
| |
| // SetPTransformID sets the id of the current PTransform. |
| func SetPTransformID(ctx context.Context, id string) context.Context { |
| // Checking for *beamCtx is an optimization, so we don't dig deeply |
| // for ids if not necessary. |
| if bctx, ok := ctx.(*beamCtx); ok { |
| return &beamCtx{Context: bctx.Context, bundleID: bctx.bundleID, ptransformID: id} |
| } |
| return &beamCtx{Context: ctx, ptransformID: id} |
| } |
| |
| const ( |
| bundleIDUnset = "(bundle id unset)" |
| ptransformIDUnset = "(ptransform id unset)" |
| ) |
| |
| func getContextKey(ctx context.Context, n name) key { |
| key := key{name: n, bundle: bundleIDUnset, ptransform: ptransformIDUnset} |
| if id := ctx.Value(bundleKey); id != nil { |
| key.bundle = id.(string) |
| } |
| if id := ctx.Value(ptransformKey); id != nil { |
| key.ptransform = id.(string) |
| } |
| return key |
| } |
| |
| // userMetric knows how to convert it's value to a Metrics_User proto. |
| type userMetric interface { |
| toProto() *fnexecution_v1.Metrics_User |
| } |
| |
| // name is a pair of strings identifying a specific metric. |
| type name struct { |
| namespace, name string |
| } |
| |
| func (n name) String() string { |
| return fmt.Sprintf("%s.%s", n.namespace, n.name) |
| } |
| |
| func newName(ns, n string) name { |
| if len(n) == 0 || len(ns) == 0 { |
| panic(fmt.Sprintf("namespace and name are required to be non-empty, got %q and %q", ns, n)) |
| } |
| return name{namespace: ns, name: n} |
| } |
| |
| type key struct { |
| name name |
| bundle, ptransform string |
| } |
| |
| var ( |
| // mu protects access to store |
| mu sync.RWMutex |
| // store is a map of BundleIDs to PtransformIDs to userMetrics. |
| // it permits us to extract metric protos for runners per data Bundle, and |
| // per PTransform. |
| store = make(map[string]map[string]map[name]userMetric) |
| |
| // We store the user path access to the cells in metric type segregated |
| // sync.Maps. Using sync.Maps lets metrics with disjoint keys have concurrent |
| // access to the cells, and using separate sync.Map per metric type |
| // simplifies code understanding, since each only contains a single type of |
| // cell. |
| |
| countersMu sync.RWMutex |
| counters = make(map[key]*counter) |
| distributionsMu sync.RWMutex |
| distributions = make(map[key]*distribution) |
| gaugesMu sync.RWMutex |
| gauges = make(map[key]*gauge) |
| ) |
| |
| // TODO(lostluck): 2018/03/05 Use a common internal beam now() instead, once that exists. |
| var now = time.Now |
| |
| // Counter is a simple counter for incrementing and decrementing a value. |
| type Counter struct { |
| name name |
| // The following are a fast cache of the key and storage |
| mu sync.Mutex |
| k key |
| c *counter |
| } |
| |
| func (m *Counter) String() string { |
| return fmt.Sprintf("Counter metric %s", m.name) |
| } |
| |
| // NewCounter returns the Counter with the given namespace and name. |
| func NewCounter(ns, n string) *Counter { |
| mn := newName(ns, n) |
| return &Counter{ |
| name: mn, |
| } |
| } |
| |
| // Inc increments the counter within the given PTransform context by v. |
| func (m *Counter) Inc(ctx context.Context, v int64) { |
| key := getContextKey(ctx, m.name) |
| cs := &counter{ |
| value: v, |
| } |
| m.mu.Lock() |
| if m.k == key { |
| m.c.inc(v) |
| m.mu.Unlock() |
| return |
| } |
| m.k = key |
| countersMu.Lock() |
| if c, loaded := counters[key]; loaded { |
| m.c = c |
| countersMu.Unlock() |
| m.mu.Unlock() |
| c.inc(v) |
| return |
| } |
| m.c = cs |
| counters[key] = cs |
| countersMu.Unlock() |
| m.mu.Unlock() |
| storeMetric(key, cs) |
| } |
| |
| // Dec decrements the counter within the given PTransform context by v. |
| func (m *Counter) Dec(ctx context.Context, v int64) { |
| m.Inc(ctx, -v) |
| } |
| |
| // counter is a metric cell for counter values. |
| type counter struct { |
| value int64 |
| mu sync.Mutex |
| } |
| |
| func (m *counter) inc(v int64) { |
| m.mu.Lock() |
| m.value += v |
| m.mu.Unlock() |
| } |
| |
| func (m *counter) String() string { |
| return fmt.Sprintf("value: %d", m.value) |
| } |
| |
| // toProto returns a Metrics_User populated with the Data messages, but not the name. The |
| // caller needs to populate with the metric's name. |
| func (m *counter) toProto() *fnexecution_v1.Metrics_User { |
| m.mu.Lock() |
| defer m.mu.Unlock() |
| return &fnexecution_v1.Metrics_User{ |
| Data: &fnexecution_v1.Metrics_User_CounterData_{ |
| CounterData: &fnexecution_v1.Metrics_User_CounterData{ |
| Value: m.value, |
| }, |
| }, |
| } |
| } |
| |
| // Distribution is a simple distribution of values. |
| type Distribution struct { |
| name name |
| |
| // The following are a fast cache of the key and storage |
| mu sync.Mutex |
| k key |
| d *distribution |
| } |
| |
| func (m *Distribution) String() string { |
| return fmt.Sprintf("Distribution metric %s", m.name) |
| } |
| |
| // NewDistribution returns the Distribution with the given namespace and name. |
| func NewDistribution(ns, n string) *Distribution { |
| mn := newName(ns, n) |
| return &Distribution{ |
| name: mn, |
| } |
| } |
| |
| // Update updates the distribution within the given PTransform context with v. |
| func (m *Distribution) Update(ctx context.Context, v int64) { |
| key := getContextKey(ctx, m.name) |
| ds := &distribution{ |
| count: 1, |
| sum: v, |
| min: v, |
| max: v, |
| } |
| m.mu.Lock() |
| if m.k == key { |
| m.d.update(v) |
| m.mu.Unlock() |
| return |
| } |
| m.k = key |
| distributionsMu.Lock() |
| if d, loaded := distributions[key]; loaded { |
| m.d = d |
| distributionsMu.Unlock() |
| m.mu.Unlock() |
| d.update(v) |
| return |
| } |
| m.d = ds |
| distributions[key] = ds |
| distributionsMu.Unlock() |
| m.mu.Unlock() |
| storeMetric(key, ds) |
| } |
| |
| // distribution is a metric cell for distribution values. |
| type distribution struct { |
| count, sum, min, max int64 |
| mu sync.Mutex |
| } |
| |
| func (m *distribution) update(v int64) { |
| m.mu.Lock() |
| if v < m.min { |
| m.min = v |
| } |
| if v > m.max { |
| m.max = v |
| } |
| m.count++ |
| m.sum += v |
| m.mu.Unlock() |
| } |
| |
| func (m *distribution) String() string { |
| return fmt.Sprintf("count: %d sum: %d min: %d max: %d", m.count, m.sum, m.min, m.max) |
| } |
| |
| // toProto returns a Metrics_User populated with the Data messages, but not the name. The |
| // caller needs to populate with the metric's name. |
| func (m *distribution) toProto() *fnexecution_v1.Metrics_User { |
| m.mu.Lock() |
| defer m.mu.Unlock() |
| return &fnexecution_v1.Metrics_User{ |
| Data: &fnexecution_v1.Metrics_User_DistributionData_{ |
| DistributionData: &fnexecution_v1.Metrics_User_DistributionData{ |
| Count: m.count, |
| Sum: m.sum, |
| Min: m.min, |
| Max: m.max, |
| }, |
| }, |
| } |
| } |
| |
| // Gauge is a time, value pair metric. |
| type Gauge struct { |
| name name |
| |
| // The following are a fast cache of the key and storage |
| mu sync.Mutex |
| k key |
| g *gauge |
| } |
| |
| func (m *Gauge) String() string { |
| return fmt.Sprintf("Guage metric %s", m.name) |
| } |
| |
| // NewGauge returns the Gauge with the given namespace and name. |
| func NewGauge(ns, n string) *Gauge { |
| mn := newName(ns, n) |
| return &Gauge{ |
| name: mn, |
| } |
| } |
| |
| // Set sets the gauge to the given value, and associates it with the current time on the clock. |
| func (m *Gauge) Set(ctx context.Context, v int64) { |
| key := getContextKey(ctx, m.name) |
| gs := &gauge{ |
| t: now(), |
| v: v, |
| } |
| m.mu.Lock() |
| if m.k == key { |
| m.g.set(v) |
| m.mu.Unlock() |
| return |
| } |
| m.k = key |
| gaugesMu.Lock() |
| if g, loaded := gauges[key]; loaded { |
| m.g = g |
| gaugesMu.Unlock() |
| m.mu.Unlock() |
| g.set(v) |
| return |
| } |
| m.g = gs |
| gauges[key] = gs |
| gaugesMu.Unlock() |
| m.mu.Unlock() |
| storeMetric(key, gs) |
| } |
| |
| // storeMetric stores a metric away on its first use so it may be retrieved later on. |
| // In the event of a name collision, storeMetric can panic, so it's prudent to release |
| // locks if they are no longer required. |
| func storeMetric(key key, m userMetric) { |
| mu.Lock() |
| defer mu.Unlock() |
| if _, ok := store[key.bundle]; !ok { |
| store[key.bundle] = make(map[string]map[name]userMetric) |
| } |
| if _, ok := store[key.bundle][key.ptransform]; !ok { |
| store[key.bundle][key.ptransform] = make(map[name]userMetric) |
| } |
| if _, ok := store[key.bundle][key.ptransform][key.name]; ok { |
| panic(fmt.Sprintf("metric name %s being reused for a second metric in a single PTransform", key.name)) |
| } |
| store[key.bundle][key.ptransform][key.name] = m |
| } |
| |
| // gauge is a metric cell for gauge values. |
| type gauge struct { |
| mu sync.Mutex |
| t time.Time |
| v int64 |
| } |
| |
| func (m *gauge) set(v int64) { |
| m.mu.Lock() |
| m.t = now() |
| m.v = v |
| m.mu.Unlock() |
| } |
| |
| func (m *gauge) toProto() *fnexecution_v1.Metrics_User { |
| m.mu.Lock() |
| defer m.mu.Unlock() |
| ts, err := ptypes.TimestampProto(m.t) |
| if err != nil { |
| panic(err) |
| } |
| return &fnexecution_v1.Metrics_User{ |
| Data: &fnexecution_v1.Metrics_User_GaugeData_{ |
| GaugeData: &fnexecution_v1.Metrics_User_GaugeData{ |
| Value: m.v, |
| Timestamp: ts, |
| }, |
| }, |
| } |
| } |
| |
| func (m *gauge) String() string { |
| return fmt.Sprintf("time: %s value: %d", m.t, m.v) |
| } |
| |
| // ToProto exports all collected metrics for the given BundleID and PTransform ID pair. |
| func ToProto(b, pt string) []*fnexecution_v1.Metrics_User { |
| mu.RLock() |
| defer mu.RUnlock() |
| ps := store[b] |
| s := ps[pt] |
| var ret []*fnexecution_v1.Metrics_User |
| for n, m := range s { |
| p := m.toProto() |
| p.MetricName = &fnexecution_v1.Metrics_User_MetricName{ |
| Name: n.name, |
| Namespace: n.namespace, |
| } |
| ret = append(ret, p) |
| } |
| return ret |
| } |
| |
| // DumpToLog is a debugging function that outputs all metrics available locally to beam.Log. |
| func DumpToLog(ctx context.Context) { |
| dumpTo(func(format string, args ...interface{}) { |
| log.Errorf(ctx, format, args...) |
| }) |
| } |
| |
| // DumpToOut is a debugging function that outputs all metrics available locally to std out. |
| func DumpToOut() { |
| dumpTo(func(format string, args ...interface{}) { |
| fmt.Printf(format+"\n", args...) |
| }) |
| } |
| |
| func dumpTo(p func(format string, args ...interface{})) { |
| mu.RLock() |
| defer mu.RUnlock() |
| countersMu.RLock() |
| defer countersMu.RUnlock() |
| distributionsMu.RLock() |
| defer distributionsMu.RUnlock() |
| gaugesMu.RLock() |
| defer gaugesMu.RUnlock() |
| var bs []string |
| for b := range store { |
| bs = append(bs, b) |
| } |
| sort.Strings(bs) |
| for _, b := range bs { |
| var pts []string |
| for pt := range store[b] { |
| pts = append(pts, pt) |
| } |
| sort.Strings(pts) |
| for _, pt := range pts { |
| var ns []name |
| for n := range store[b][pt] { |
| ns = append(ns, n) |
| } |
| sort.Slice(ns, func(i, j int) bool { |
| if ns[i].namespace < ns[j].namespace { |
| return true |
| } |
| if ns[i].namespace == ns[j].namespace && ns[i].name < ns[j].name { |
| return true |
| } |
| return false |
| }) |
| p("Bundle: %q - PTransformID: %q", b, pt) |
| for _, n := range ns { |
| key := key{name: n, bundle: b, ptransform: pt} |
| if m, ok := counters[key]; ok { |
| p("\t%s - %s", key.name, m) |
| } |
| if m, ok := distributions[key]; ok { |
| p("\t%s - %s", key.name, m) |
| } |
| if m, ok := gauges[key]; ok { |
| p("\t%s - %s", key.name, m) |
| } |
| } |
| } |
| } |
| } |
| |
| // Clear resets all storage associated with metrics for tests. |
| // Calling this in pipeline code leads to inaccurate metrics. |
| func Clear() { |
| mu.Lock() |
| store = make(map[string]map[string]map[name]userMetric) |
| counters = make(map[key]*counter) |
| distributions = make(map[key]*distribution) |
| gauges = make(map[key]*gauge) |
| mu.Unlock() |
| } |
| |
| // ClearBundleData removes stored references associated with a given bundle, |
| // so it can be garbage collected. |
| func ClearBundleData(b string) { |
| // No concurrency races since mu guards all access to store, |
| // and the metric cell sync.Maps are goroutine safe. |
| mu.Lock() |
| pts := store[b] |
| countersMu.Lock() |
| distributionsMu.Lock() |
| gaugesMu.Lock() |
| for pt, m := range pts { |
| for n := range m { |
| key := key{name: n, bundle: b, ptransform: pt} |
| delete(counters, key) |
| delete(distributions, key) |
| delete(gauges, key) |
| } |
| } |
| countersMu.Unlock() |
| distributionsMu.Unlock() |
| gaugesMu.Unlock() |
| delete(store, b) |
| mu.Unlock() |
| } |