blob: 666670312cd52e173e780a5bf99870723ea2b548 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package prometheus
import (
import (
ocprom ""
prom ""
import (
const (
reporterName = "prometheus"
serviceKey = constant.ServiceKey
groupKey = constant.GroupKey
versionKey = constant.VersionKey
methodKey = constant.MethodKey
timeoutKey = constant.TimeoutKey
// to identify side
providerPrefix = "provider_"
consumerPrefix = "consumer_"
// to identify the metric's type
rtSuffix = "_rt"
// to identify the metric's type
tpsSuffix = "_tps"
var (
labelNames = []string{serviceKey, groupKey, versionKey, methodKey, timeoutKey}
reporterInstance *PrometheusReporter
reporterInitOnce sync.Once
defaultHistogramBucket = []float64{10, 50, 100, 200, 500, 1000, 10000}
// should initialize after loading configuration
func init() {
extension.SetMetricReporter(reporterName, newPrometheusReporter)
// PrometheusReporter will collect the data for Prometheus
// if you want to use this feature, you need to initialize your prometheus.
type PrometheusReporter struct {
reporterServer *http.Server
reporterConfig *metrics.ReporterConfig
// report the consumer-side's rt gauge data
consumerRTSummaryVec *prometheus.SummaryVec
// report the provider-side's rt gauge data
providerRTSummaryVec *prometheus.SummaryVec
// todo tps support
// report the consumer-side's tps gauge data
consumerTPSGaugeVec *prometheus.GaugeVec
// report the provider-side's tps gauge data
providerTPSGaugeVec *prometheus.GaugeVec
userGauge sync.Map
userSummary sync.Map
userCounter sync.Map
userCounterVec sync.Map
userGaugeVec sync.Map
userSummaryVec sync.Map
namespace string
// Report reports the duration to Prometheus
// the role in url must be consumer or provider
// or it will be ignored
func (reporter *PrometheusReporter) Report(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, cost time.Duration, res protocol.Result) {
if !reporter.reporterConfig.Enable {
url := invoker.GetURL()
var rtVec *prometheus.SummaryVec
if isProvider(url) {
rtVec = reporter.providerRTSummaryVec
} else if isConsumer(url) {
rtVec = reporter.consumerRTSummaryVec
} else {
logger.Warnf("The url belongs neither the consumer nor the provider, "+
"so the invocation will be ignored. url: %s", url.String())
labels := prometheus.Labels{
serviceKey: url.Service(),
groupKey: url.GetParam(groupKey, ""),
versionKey: url.GetParam(constant.AppVersionKey, ""),
methodKey: invocation.MethodName(),
timeoutKey: url.GetParam(timeoutKey, ""),
costMs := cost.Nanoseconds()
func newHistogramVec(name, namespace string, labels []string) *prometheus.HistogramVec {
return prometheus.NewHistogramVec(
Namespace: namespace,
Name: name,
Buckets: defaultHistogramBucket,
func newCounter(name, namespace string) prometheus.Counter {
return prometheus.NewCounter(
Namespace: namespace,
Name: name,
func newCounterVec(name, namespace string, labels []string) *prometheus.CounterVec {
return prometheus.NewCounterVec(
Name: name,
Namespace: namespace,
}, labels)
func newGauge(name, namespace string) prometheus.Gauge {
return prometheus.NewGauge(
Name: name,
Namespace: namespace,
func newGaugeVec(name, namespace string, labels []string) *prometheus.GaugeVec {
return prometheus.NewGaugeVec(
Name: name,
Namespace: namespace,
}, labels)
func newSummary(name, namespace string) prometheus.Summary {
return prometheus.NewSummary(
Name: name,
Namespace: namespace,
// newSummaryVec create SummaryVec, the Namespace is dubbo
// the objectives is from my experience.
func newSummaryVec(name, namespace string, labels []string, maxAge int64) *prometheus.SummaryVec {
return prometheus.NewSummaryVec(
Namespace: namespace,
Name: name,
Objectives: map[float64]float64{
0.5: 0.01,
0.75: 0.01,
0.90: 0.005,
0.98: 0.002,
0.99: 0.001,
0.999: 0.0001,
MaxAge: time.Duration(maxAge),
// isProvider shows whether this url represents the application received the request as server
func isProvider(url *common.URL) bool {
role := url.GetParam(constant.RegistryRoleKey, "")
return strings.EqualFold(role, strconv.Itoa(common.PROVIDER))
// isConsumer shows whether this url represents the application sent then request as client
func isConsumer(url *common.URL) bool {
role := url.GetParam(constant.RegistryRoleKey, "")
return strings.EqualFold(role, strconv.Itoa(common.CONSUMER))
// newPrometheusReporter create new prometheusReporter
// it will register the metrics into prometheus
func newPrometheusReporter(reporterConfig *metrics.ReporterConfig) metrics.Reporter {
if reporterInstance == nil {
reporterInitOnce.Do(func() {
reporterInstance = &PrometheusReporter{
reporterConfig: reporterConfig,
namespace: reporterConfig.Namespace,
consumerRTSummaryVec: newSummaryVec(consumerPrefix+serviceKey+rtSuffix, reporterConfig.Namespace, labelNames, reporterConfig.SummaryMaxAge),
providerRTSummaryVec: newSummaryVec(providerPrefix+serviceKey+rtSuffix, reporterConfig.Namespace, labelNames, reporterConfig.SummaryMaxAge),
prom.DefaultRegisterer.MustRegister(reporterInstance.consumerRTSummaryVec, reporterInstance.providerRTSummaryVec)
if reporterConfig.Enable {
if reporterConfig.Mode == metrics.ReportModePull {
go reporterInstance.startupServer(reporterConfig)
// todo pushgateway support
} else {
return reporterInstance
// setGauge set gauge to target value with given label, if label is not empty, set gauge vec
// if target gauge/gaugevec not exist, just create new gauge and set the value
func (reporter *PrometheusReporter) setGauge(gaugeName string, toSetValue float64, labelMap prometheus.Labels) {
if len(labelMap) == 0 {
// gauge
if val, exist := reporter.userGauge.Load(gaugeName); !exist {
gauge := newGauge(gaugeName, reporter.namespace)
err := prom.DefaultRegisterer.Register(gauge)
if err == nil {
reporter.userGauge.Store(gaugeName, gauge)
} else if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
// A gauge for that metric has been registered before.
// Use the old gauge from now on.
} else {
// gauge vec
if val, exist := reporter.userGaugeVec.Load(gaugeName); !exist {
keyList := make([]string, 0)
for k, _ := range labelMap {
keyList = append(keyList, k)
gaugeVec := newGaugeVec(gaugeName, reporter.namespace, keyList)
err := prom.DefaultRegisterer.Register(gaugeVec)
if err == nil {
reporter.userGaugeVec.Store(gaugeName, gaugeVec)
} else if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
// A gauge for that metric has been registered before.
// Use the old gauge from now on.
} else {
// incCounter inc counter to inc if label is not empty, set counter vec
// if target counter/counterVec not exist, just create new counter and inc the value
func (reporter *PrometheusReporter) incCounter(counterName string, labelMap prometheus.Labels) {
if len(labelMap) == 0 {
// counter
if val, exist := reporter.userCounter.Load(counterName); !exist {
counter := newCounter(counterName, reporter.namespace)
err := prom.DefaultRegisterer.Register(counter)
if err == nil {
reporter.userCounter.Store(counterName, counter)
} else if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
// A counter for that metric has been registered before.
// Use the old counter from now on.
} else {
// counter vec inc
if val, exist := reporter.userCounterVec.Load(counterName); !exist {
keyList := make([]string, 0)
for k, _ := range labelMap {
keyList = append(keyList, k)
counterVec := newCounterVec(counterName, reporter.namespace, keyList)
err := prom.DefaultRegisterer.Register(counterVec)
if err == nil {
reporter.userCounterVec.Store(counterName, counterVec)
} else if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
// A counter for that metric has been registered before.
// Use the old counter from now on.
} else {
// incSummary inc summary to target value with given label, if label is not empty, set summary vec
// if target summary/summaryVec not exist, just create new summary and set the value
func (reporter *PrometheusReporter) incSummary(summaryName string, toSetValue float64, labelMap prometheus.Labels) {
if len(labelMap) == 0 {
// summary
if val, exist := reporter.userSummary.Load(summaryName); !exist {
summary := newSummary(summaryName, reporter.namespace)
err := prom.DefaultRegisterer.Register(summary)
if err == nil {
reporter.userSummary.Store(summaryName, summary)
} else if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
// A summary for that metric has been registered before.
// Use the old summary from now on.
} else {
// summary vec
if val, exist := reporter.userSummaryVec.Load(summaryName); !exist {
keyList := make([]string, 0)
for k, _ := range labelMap {
keyList = append(keyList, k)
summaryVec := newSummaryVec(summaryName, reporter.namespace, keyList, reporter.reporterConfig.SummaryMaxAge)
err := prom.DefaultRegisterer.Register(summaryVec)
if err == nil {
reporter.userSummaryVec.Store(summaryName, summaryVec)
} else if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
// A summary for that metric has been registered before.
// Use the old summary from now on.
} else {
func SetGaugeWithLabel(gaugeName string, val float64, label prometheus.Labels) {
if reporterInstance.reporterConfig.Enable {
reporterInstance.setGauge(gaugeName, val, label)
func SetGauge(gaugeName string, val float64) {
if reporterInstance.reporterConfig.Enable {
reporterInstance.setGauge(gaugeName, val, make(prometheus.Labels))
func IncCounterWithLabel(counterName string, label prometheus.Labels) {
if reporterInstance.reporterConfig.Enable {
reporterInstance.incCounter(counterName, label)
func IncCounter(summaryName string) {
if reporterInstance.reporterConfig.Enable {
reporterInstance.incCounter(summaryName, make(prometheus.Labels))
func IncSummaryWithLabel(counterName string, val float64, label prometheus.Labels) {
if reporterInstance.reporterConfig.Enable {
reporterInstance.incSummary(counterName, val, label)
func IncSummary(summaryName string, val float64) {
if reporterInstance.reporterConfig.Enable {
reporterInstance.incSummary(summaryName, val, make(prometheus.Labels))
func (reporter *PrometheusReporter) startupServer(reporterConfig *metrics.ReporterConfig) {
metricsExporter, err := ocprom.NewExporter(ocprom.Options{
Registry: prom.DefaultRegisterer.(*prom.Registry),
if err != nil {
logger.Errorf("new prometheus reporter with error = %s", err)
// start server
mux := http.NewServeMux()
mux.Handle(reporterConfig.Path, metricsExporter)
reporterInstance.reporterServer = &http.Server{Addr: ":" + reporterConfig.Port, Handler: mux}
if err := reporterInstance.reporterServer.ListenAndServe(); err != nil {
logger.Warnf("new prometheus reporter with error = %s", err)
func (reporter *PrometheusReporter) shutdownServer() {
if reporterInstance.reporterServer != nil {
err := reporterInstance.reporterServer.Shutdown(context.Background())
if err != nil {
logger.Errorf("shutdown prometheus reporter with error = %s, prometheus reporter close now", err)