blob: 5ce3339f829359ded6f30aacadf49c6b1ea2212f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package main
import (
"math"
"org/apache/htrace/common"
"org/apache/htrace/conf"
"sync"
"time"
)
//
// The Metrics Sink for HTraced.
//
// The Metrics sink keeps track of metrics for the htraced daemon.
// It is important to have good metrics so that we can properly manager htraced. In particular, we
// need to know what rate we are receiving spans at, the main places spans came from. If spans
// were dropped because of a high sampling rates, we need to know which part of the system dropped
// them so that we can adjust the sampling rate there.
//
const LATENCY_CIRC_BUF_SIZE = 4096
type MetricsSink struct {
// The metrics sink logger.
lg *common.Logger
// The maximum number of entries we shuld allow in the HostSpanMetrics map.
maxMtx int
// The total number of spans ingested by the server (counting dropped spans)
IngestedSpans uint64
// The total number of spans written to leveldb since the server started.
WrittenSpans uint64
// The total number of spans dropped by the server.
ServerDropped uint64
// The total number of spans dropped by the client (self-reported).
ClientDroppedEstimate uint64
// Per-host Span Metrics
HostSpanMetrics common.SpanMetricsMap
// The last few writeSpan latencies
wsLatencyCircBuf *CircBufU32
// Lock protecting all metrics
lock sync.Mutex
}
func NewMetricsSink(cnf *conf.Config) *MetricsSink {
return &MetricsSink {
lg: common.NewLogger("metrics", cnf),
maxMtx: cnf.GetInt(conf.HTRACE_METRICS_MAX_ADDR_ENTRIES),
HostSpanMetrics: make(common.SpanMetricsMap),
wsLatencyCircBuf: NewCircBufU32(LATENCY_CIRC_BUF_SIZE),
}
}
// Update the total number of spans which were ingested, as well as other
// metrics that get updated during span ingest.
func (msink *MetricsSink) UpdateIngested(addr string, totalIngested int,
serverDropped int, clientDroppedEstimate int, wsLatency time.Duration) {
msink.lock.Lock()
defer msink.lock.Unlock()
msink.IngestedSpans += uint64(totalIngested)
msink.ServerDropped += uint64(serverDropped)
msink.ClientDroppedEstimate += uint64(clientDroppedEstimate)
msink.updateSpanMetrics(addr, 0, serverDropped, clientDroppedEstimate)
wsLatencyMs := wsLatency.Nanoseconds() / 1000000
var wsLatency32 uint32
if wsLatencyMs > math.MaxUint32 {
wsLatency32 = math.MaxUint32
} else {
wsLatency32 = uint32(wsLatencyMs)
}
msink.wsLatencyCircBuf.Append(wsLatency32)
}
// Update the per-host span metrics. Must be called with the lock held.
func (msink *MetricsSink) updateSpanMetrics(addr string, numWritten int,
serverDropped int, clientDroppedEstimate int) {
mtx, found := msink.HostSpanMetrics[addr]
if !found {
// Ensure that the per-host span metrics map doesn't grow too large.
if len(msink.HostSpanMetrics) >= msink.maxMtx {
// Delete a random entry
for k := range msink.HostSpanMetrics {
msink.lg.Warnf("Evicting metrics entry for addr %s "+
"because there are more than %d addrs.\n", k, msink.maxMtx)
delete(msink.HostSpanMetrics, k)
break
}
}
mtx = &common.SpanMetrics { }
msink.HostSpanMetrics[addr] = mtx
}
mtx.Written += uint64(numWritten)
mtx.ServerDropped += uint64(serverDropped)
mtx.ClientDroppedEstimate += uint64(clientDroppedEstimate)
}
// Update the total number of spans which were persisted to disk.
func (msink *MetricsSink) UpdatePersisted(addr string, totalWritten int,
serverDropped int) {
msink.lock.Lock()
defer msink.lock.Unlock()
msink.WrittenSpans += uint64(totalWritten)
msink.ServerDropped += uint64(serverDropped)
msink.updateSpanMetrics(addr, totalWritten, serverDropped, 0)
}
// Read the server stats.
func (msink *MetricsSink) PopulateServerStats(stats *common.ServerStats) {
msink.lock.Lock()
defer msink.lock.Unlock()
stats.IngestedSpans = msink.IngestedSpans
stats.WrittenSpans = msink.WrittenSpans
stats.ServerDroppedSpans = msink.ServerDropped
stats.ClientDroppedEstimate = msink.ClientDroppedEstimate
stats.MaxWriteSpansLatencyMs = msink.wsLatencyCircBuf.Max()
stats.AverageWriteSpansLatencyMs = msink.wsLatencyCircBuf.Average()
stats.HostSpanMetrics = make(common.SpanMetricsMap)
for k, v := range(msink.HostSpanMetrics) {
stats.HostSpanMetrics[k] = &common.SpanMetrics {
Written: v.Written,
ServerDropped: v.ServerDropped,
ClientDroppedEstimate: v.ClientDroppedEstimate,
}
}
}
// A circular buffer of uint32s which supports appending and taking the
// average, and some other things.
type CircBufU32 struct {
// The next slot to fill
slot int
// The number of slots which are in use. This number only ever
// increases until the buffer is full.
slotsUsed int
// The buffer
buf []uint32
}
func NewCircBufU32(size int) *CircBufU32 {
return &CircBufU32 {
slotsUsed: -1,
buf: make([]uint32, size),
}
}
func (cbuf *CircBufU32) Max() uint32 {
var max uint32
for bufIdx := 0; bufIdx < cbuf.slotsUsed; bufIdx++ {
if cbuf.buf[bufIdx] > max {
max = cbuf.buf[bufIdx]
}
}
return max
}
func (cbuf *CircBufU32) Average() uint32 {
var total uint64
for bufIdx := 0; bufIdx < cbuf.slotsUsed; bufIdx++ {
total += uint64(cbuf.buf[bufIdx])
}
return uint32(total / uint64(cbuf.slotsUsed))
}
func (cbuf *CircBufU32) Append(val uint32) {
cbuf.buf[cbuf.slot] = val
cbuf.slot++
if cbuf.slotsUsed < cbuf.slot {
cbuf.slotsUsed = cbuf.slot
}
if cbuf.slot >= len(cbuf.buf) {
cbuf.slot = 0
}
}