blob: 058d4928d5a6f5e135e7d448a950fd43935c3601 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package prometheus
import (
"bytes"
"errors"
"net/http"
"os"
"strconv"
"sync"
"time"
)
import (
"github.com/dubbo-go-pixiu/pixiu-api/pkg/context"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/expfmt"
)
import (
"github.com/apache/dubbo-go-pixiu/pkg/client"
contextHttp "github.com/apache/dubbo-go-pixiu/pkg/context/http"
"github.com/apache/dubbo-go-pixiu/pkg/logger"
)
var defaultSubsystem = "pixiu"
type ContextHandlerFunc func(c *contextHttp.HttpContext) error
const (
_ = iota // ignore first value by assigning to blank identifier
KB float64 = 1 << (10 * iota)
MB
GB
TB
)
type FavContextKeyType string
type Metric struct {
MetricCollector prometheus.Collector
ID string
Name string
Description string
Type string
Args []string
Buckets []float64
}
// reqDurBuckets is the buckets for request duration. Here, we use the prometheus defaults
// which are for ~10s request length max: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10}
var reqDurBuckets = prometheus.DefBuckets
// reqSzBuckets is the buckets for request size. Here we define a spectrom from 1KB thru 1NB up to 10MB.
var reqSzBuckets = []float64{1.0 * KB, 2.0 * KB, 5.0 * KB, 10.0 * KB, 100 * KB, 500 * KB, 1.0 * MB, 2.5 * MB, 5.0 * MB, 10.0 * MB}
// resSzBuckets is the buckets for response size. Here we define a spectrom from 1KB thru 1NB up to 10MB.
var resSzBuckets = []float64{1.0 * KB, 2.0 * KB, 5.0 * KB, 10.0 * KB, 100 * KB, 500 * KB, 1.0 * MB, 2.5 * MB, 5.0 * MB, 10.0 * MB}
// Standard default metrics
// counter, counter_vec, gauge, gauge_vec,
// histogram, histogram_vec, summary, summary_vec
var reqCnt = &Metric{
ID: "reqCnt",
Name: "requests_total",
Description: "How many HTTP requests processed, partitioned by status code and HTTP method.",
Type: "counter_vec",
Args: []string{"code", "method", "host", "url"},
}
var reqDur = &Metric{
ID: "reqDur",
Name: "request_duration_seconds",
Description: "The HTTP request latencies in seconds.",
Args: []string{"code", "method", "url"},
Type: "histogram_vec",
Buckets: reqDurBuckets,
}
var resSz = &Metric{
ID: "resSz",
Name: "response_size_bytes",
Description: "The HTTP response sizes in bytes.",
Args: []string{"code", "method", "url"},
Type: "histogram_vec",
Buckets: resSzBuckets,
}
var reqSz = &Metric{
ID: "reqSz",
Name: "request_size_bytes",
Description: "The HTTP request sizes in bytes.",
Args: []string{"code", "method", "url"},
Type: "histogram_vec",
Buckets: reqSzBuckets,
}
var standardMetrics = []*Metric{
reqCnt,
reqDur,
resSz,
reqSz,
}
// NewMetric associates prometheus.Collector based on Metric.Type
func NewMetric(m *Metric, subsystem string) prometheus.Collector {
var metric prometheus.Collector
switch m.Type {
case "counter_vec":
metric = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: subsystem,
Name: m.Name,
Help: m.Description,
},
m.Args,
)
case "counter":
metric = prometheus.NewCounter(
prometheus.CounterOpts{
Subsystem: subsystem,
Name: m.Name,
Help: m.Description,
},
)
case "gauge_vec":
metric = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: subsystem,
Name: m.Name,
Help: m.Description,
},
m.Args,
)
case "gauge":
metric = prometheus.NewGauge(
prometheus.GaugeOpts{
Subsystem: subsystem,
Name: m.Name,
Help: m.Description,
},
)
case "histogram_vec":
metric = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: subsystem,
Name: m.Name,
Help: m.Description,
Buckets: m.Buckets,
},
m.Args,
)
case "histogram":
metric = prometheus.NewHistogram(
prometheus.HistogramOpts{
Subsystem: subsystem,
Name: m.Name,
Help: m.Description,
Buckets: m.Buckets,
},
)
case "summary_vec":
metric = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Subsystem: subsystem,
Name: m.Name,
Help: m.Description,
},
m.Args,
)
case "summary":
metric = prometheus.NewSummary(
prometheus.SummaryOpts{
Subsystem: subsystem,
Name: m.Name,
Help: m.Description,
},
)
}
return metric
}
type RequestCounterLabelMappingFunc func(c *contextHttp.HttpContext) string
type Prometheus struct {
reqCnt *prometheus.CounterVec
reqDur, reqSz, resSz *prometheus.HistogramVec
Ppg PushGateway
MetricsList []*Metric
MetricsPath string
Subsystem string
RequestCounterURLLabelMappingFunc RequestCounterLabelMappingFunc
RequestCounterHostLabelMappingFunc RequestCounterLabelMappingFunc
URLLabelFromContext string
Datacontext context.Context
}
// PushGateway contains the configuration for pushing to a Prometheus pushgateway (optional)
type PushGateway struct {
CounterPush bool
PushIntervalSeconds time.Duration
PushIntervalThreshold int
PushGatewayURL string
Job string
counter int
mutex sync.RWMutex
}
// NewPrometheus generates a new set of metrics with a certain subsystem name
func NewPrometheus() *Prometheus {
var metricsList []*Metric
metricsList = append(metricsList, standardMetrics...)
p := &Prometheus{
MetricsList: metricsList,
Subsystem: defaultSubsystem,
RequestCounterURLLabelMappingFunc: func(c *contextHttp.HttpContext) string {
return c.GetUrl()
},
RequestCounterHostLabelMappingFunc: func(c *contextHttp.HttpContext) string {
return c.Request.Host
},
}
p.registerMetrics()
return p
}
func (p *Prometheus) registerMetrics() {
for _, metricDef := range p.MetricsList {
metric := NewMetric(metricDef, p.Subsystem)
if err := prometheus.Register(metric); err != nil {
logger.Errorf("%s could not be registered in Prometheus: %v", metricDef.Name, err)
}
switch metricDef {
case reqCnt:
p.reqCnt = metric.(*prometheus.CounterVec)
case reqDur:
p.reqDur = metric.(*prometheus.HistogramVec)
case resSz:
p.resSz = metric.(*prometheus.HistogramVec)
case reqSz:
p.reqSz = metric.(*prometheus.HistogramVec)
}
metricDef.MetricCollector = metric
}
}
func (p *Prometheus) SetPushGatewayUrl(pushGatewayURL, metricspath string) {
p.Ppg.PushGatewayURL = pushGatewayURL
p.MetricsPath = metricspath
}
func (p *Prometheus) SetPushIntervalThreshold(isTurn bool, pushIntervalThreshold int) {
p.Ppg.CounterPush = isTurn
p.Ppg.PushIntervalThreshold = pushIntervalThreshold
}
func (p *Prometheus) SetPushGatewayJob(j string) {
p.Ppg.Job = j
}
func (p *Prometheus) startPushCounter() {
if p.Ppg.counter >= p.Ppg.PushIntervalThreshold {
go p.sendMetricsToPushGateway(p.getMetrics())
p.Ppg.counter = 0
}
}
func (p *Prometheus) SetPushGateway() {
if p.Ppg.CounterPush {
p.startPushCounter()
}
}
func (p *Prometheus) getMetrics() []byte {
out := &bytes.Buffer{}
metricFamilies, err := prometheus.DefaultGatherer.Gather()
if err != nil {
logger.Errorf("prometheus.DefaultGatherer.Gather error: %v", err)
return []byte{}
}
for i := range metricFamilies {
_, err := expfmt.MetricFamilyToText(out, metricFamilies[i])
if err != nil {
logger.Errorf("failed to converts a MetricFamily proto message into text format %v", err)
}
}
return out.Bytes()
}
func (p *Prometheus) sendMetricsToPushGateway(metrics []byte) {
req, err := http.NewRequest(http.MethodPost, p.getPushGatewayURL(), bytes.NewBuffer(metrics))
if err != nil {
logger.Errorf("failed to create push gateway request: %v", err)
return
}
if _, err = (&http.Client{}).Do(req); err != nil {
logger.Errorf("Error sending to push gateway: %v", err)
}
}
func (p *Prometheus) getPushGatewayURL() string {
h, _ := os.Hostname()
if p.Ppg.Job == "" {
p.Ppg.Job = "pixiu"
}
return p.Ppg.PushGatewayURL + p.MetricsPath + "/job/" + p.Ppg.Job + "/instance/" + h
}
// HandlerFunc defines handler function for middleware
func (p *Prometheus) HandlerFunc() ContextHandlerFunc {
return func(c *contextHttp.HttpContext) error {
start := time.Now()
reqSz, err1 := computeApproximateRequestSize(c.Request)
//fmt.Println("reqSz", reqSz)
elapsed := float64(time.Since(start)) / float64(time.Second)
//fmt.Println("elapsed ", elapsed)
url := p.RequestCounterURLLabelMappingFunc(c)
//fmt.Println("url ", url)
statusStr := strconv.Itoa(c.GetStatusCode())
//fmt.Println("statusStr", statusStr)
method := c.GetMethod()
//fmt.Println("method ", method)
p.reqDur.WithLabelValues(statusStr, method, url).Observe(elapsed)
p.reqCnt.WithLabelValues(statusStr, method, p.RequestCounterHostLabelMappingFunc(c), url).Inc()
if err1 == nil {
p.reqSz.WithLabelValues(statusStr, method, url).Observe(float64(reqSz))
}
resSz, err2 := computeApproximateResponseSize(c.TargetResp)
if err2 == nil {
p.resSz.WithLabelValues(statusStr, method, url).Observe(float64(resSz))
}
p.Ppg.mutex.Lock()
p.Ppg.counter = p.Ppg.counter + 1
defer p.Ppg.mutex.Unlock()
p.SetPushGateway()
return nil
}
}
func computeApproximateRequestSize(r *http.Request) (int, error) {
if r == nil {
return 0, errors.New("http.Request is null pointer ")
}
s := 0
if r.URL != nil {
s = len(r.URL.Path)
}
s += len(r.Method)
s += len(r.Proto)
for name, values := range r.Header {
s += len(name)
for _, value := range values {
s += len(value)
}
}
s += len(r.Host)
if r.ContentLength != -1 {
s += int(r.ContentLength)
}
return s, nil
}
func computeApproximateResponseSize(res *client.Response) (int, error) {
if res == nil {
return 0, errors.New("client.Response is null pointer ")
}
return len(res.Data), nil
}