| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package prometheus |
| |
| import ( |
| "bytes" |
| "errors" |
| "net/http" |
| "os" |
| "strconv" |
| "sync" |
| "time" |
| ) |
| |
| import ( |
| "github.com/dubbo-go-pixiu/pixiu-api/pkg/context" |
| "github.com/prometheus/client_golang/prometheus" |
| "github.com/prometheus/common/expfmt" |
| ) |
| |
| import ( |
| "github.com/apache/dubbo-go-pixiu/pkg/client" |
| contextHttp "github.com/apache/dubbo-go-pixiu/pkg/context/http" |
| "github.com/apache/dubbo-go-pixiu/pkg/logger" |
| ) |
| |
| var defaultSubsystem = "pixiu" |
| |
| type ContextHandlerFunc func(c *contextHttp.HttpContext) error |
| |
| const ( |
| _ = iota // ignore first value by assigning to blank identifier |
| KB float64 = 1 << (10 * iota) |
| MB |
| GB |
| TB |
| ) |
| |
| type FavContextKeyType string |
| |
| type Metric struct { |
| MetricCollector prometheus.Collector |
| ID string |
| Name string |
| Description string |
| Type string |
| Args []string |
| Buckets []float64 |
| } |
| |
| // reqDurBuckets is the buckets for request duration. Here, we use the prometheus defaults |
| // which are for ~10s request length max: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10} |
| var reqDurBuckets = prometheus.DefBuckets |
| |
| // reqSzBuckets is the buckets for request size. Here we define a spectrom from 1KB thru 1NB up to 10MB. |
| var reqSzBuckets = []float64{1.0 * KB, 2.0 * KB, 5.0 * KB, 10.0 * KB, 100 * KB, 500 * KB, 1.0 * MB, 2.5 * MB, 5.0 * MB, 10.0 * MB} |
| |
| // resSzBuckets is the buckets for response size. Here we define a spectrom from 1KB thru 1NB up to 10MB. |
| var resSzBuckets = []float64{1.0 * KB, 2.0 * KB, 5.0 * KB, 10.0 * KB, 100 * KB, 500 * KB, 1.0 * MB, 2.5 * MB, 5.0 * MB, 10.0 * MB} |
| |
| // Standard default metrics |
| // counter, counter_vec, gauge, gauge_vec, |
| // histogram, histogram_vec, summary, summary_vec |
| |
| var reqCnt = &Metric{ |
| ID: "reqCnt", |
| Name: "requests_total", |
| Description: "How many HTTP requests processed, partitioned by status code and HTTP method.", |
| Type: "counter_vec", |
| Args: []string{"code", "method", "host", "url"}, |
| } |
| |
| var reqDur = &Metric{ |
| ID: "reqDur", |
| Name: "request_duration_seconds", |
| Description: "The HTTP request latencies in seconds.", |
| Args: []string{"code", "method", "url"}, |
| Type: "histogram_vec", |
| Buckets: reqDurBuckets, |
| } |
| |
| var resSz = &Metric{ |
| ID: "resSz", |
| Name: "response_size_bytes", |
| Description: "The HTTP response sizes in bytes.", |
| Args: []string{"code", "method", "url"}, |
| Type: "histogram_vec", |
| Buckets: resSzBuckets, |
| } |
| |
| var reqSz = &Metric{ |
| ID: "reqSz", |
| Name: "request_size_bytes", |
| Description: "The HTTP request sizes in bytes.", |
| Args: []string{"code", "method", "url"}, |
| Type: "histogram_vec", |
| Buckets: reqSzBuckets, |
| } |
| |
| var standardMetrics = []*Metric{ |
| reqCnt, |
| reqDur, |
| resSz, |
| reqSz, |
| } |
| |
| // NewMetric associates prometheus.Collector based on Metric.Type |
| func NewMetric(m *Metric, subsystem string) prometheus.Collector { |
| var metric prometheus.Collector |
| switch m.Type { |
| case "counter_vec": |
| metric = prometheus.NewCounterVec( |
| prometheus.CounterOpts{ |
| Subsystem: subsystem, |
| Name: m.Name, |
| Help: m.Description, |
| }, |
| m.Args, |
| ) |
| case "counter": |
| metric = prometheus.NewCounter( |
| prometheus.CounterOpts{ |
| Subsystem: subsystem, |
| Name: m.Name, |
| Help: m.Description, |
| }, |
| ) |
| case "gauge_vec": |
| metric = prometheus.NewGaugeVec( |
| prometheus.GaugeOpts{ |
| Subsystem: subsystem, |
| Name: m.Name, |
| Help: m.Description, |
| }, |
| m.Args, |
| ) |
| case "gauge": |
| metric = prometheus.NewGauge( |
| prometheus.GaugeOpts{ |
| Subsystem: subsystem, |
| Name: m.Name, |
| Help: m.Description, |
| }, |
| ) |
| case "histogram_vec": |
| metric = prometheus.NewHistogramVec( |
| prometheus.HistogramOpts{ |
| Subsystem: subsystem, |
| Name: m.Name, |
| Help: m.Description, |
| Buckets: m.Buckets, |
| }, |
| m.Args, |
| ) |
| case "histogram": |
| metric = prometheus.NewHistogram( |
| prometheus.HistogramOpts{ |
| Subsystem: subsystem, |
| Name: m.Name, |
| Help: m.Description, |
| Buckets: m.Buckets, |
| }, |
| ) |
| case "summary_vec": |
| metric = prometheus.NewSummaryVec( |
| prometheus.SummaryOpts{ |
| Subsystem: subsystem, |
| Name: m.Name, |
| Help: m.Description, |
| }, |
| m.Args, |
| ) |
| case "summary": |
| metric = prometheus.NewSummary( |
| prometheus.SummaryOpts{ |
| Subsystem: subsystem, |
| Name: m.Name, |
| Help: m.Description, |
| }, |
| ) |
| } |
| return metric |
| } |
| |
| type RequestCounterLabelMappingFunc func(c *contextHttp.HttpContext) string |
| |
| type Prometheus struct { |
| reqCnt *prometheus.CounterVec |
| reqDur, reqSz, resSz *prometheus.HistogramVec |
| Ppg PushGateway |
| |
| MetricsList []*Metric |
| MetricsPath string |
| Subsystem string |
| |
| RequestCounterURLLabelMappingFunc RequestCounterLabelMappingFunc |
| RequestCounterHostLabelMappingFunc RequestCounterLabelMappingFunc |
| |
| URLLabelFromContext string |
| Datacontext context.Context |
| } |
| |
| // PushGateway contains the configuration for pushing to a Prometheus pushgateway (optional) |
| type PushGateway struct { |
| CounterPush bool |
| PushIntervalSeconds time.Duration |
| PushIntervalThreshold int |
| PushGatewayURL string |
| Job string |
| counter int |
| mutex sync.RWMutex |
| } |
| |
| // NewPrometheus generates a new set of metrics with a certain subsystem name |
| func NewPrometheus() *Prometheus { |
| var metricsList []*Metric |
| metricsList = append(metricsList, standardMetrics...) |
| p := &Prometheus{ |
| MetricsList: metricsList, |
| Subsystem: defaultSubsystem, |
| RequestCounterURLLabelMappingFunc: func(c *contextHttp.HttpContext) string { |
| return c.GetUrl() |
| }, |
| RequestCounterHostLabelMappingFunc: func(c *contextHttp.HttpContext) string { |
| return c.Request.Host |
| }, |
| } |
| p.registerMetrics() |
| return p |
| } |
| |
| func (p *Prometheus) registerMetrics() { |
| for _, metricDef := range p.MetricsList { |
| metric := NewMetric(metricDef, p.Subsystem) |
| if err := prometheus.Register(metric); err != nil { |
| logger.Errorf("%s could not be registered in Prometheus: %v", metricDef.Name, err) |
| } |
| switch metricDef { |
| |
| case reqCnt: |
| p.reqCnt = metric.(*prometheus.CounterVec) |
| case reqDur: |
| p.reqDur = metric.(*prometheus.HistogramVec) |
| case resSz: |
| p.resSz = metric.(*prometheus.HistogramVec) |
| case reqSz: |
| p.reqSz = metric.(*prometheus.HistogramVec) |
| } |
| metricDef.MetricCollector = metric |
| } |
| } |
| |
| func (p *Prometheus) SetPushGatewayUrl(pushGatewayURL, metricspath string) { |
| |
| p.Ppg.PushGatewayURL = pushGatewayURL |
| p.MetricsPath = metricspath |
| } |
| |
| func (p *Prometheus) SetPushIntervalThreshold(isTurn bool, pushIntervalThreshold int) { |
| p.Ppg.CounterPush = isTurn |
| p.Ppg.PushIntervalThreshold = pushIntervalThreshold |
| } |
| |
| func (p *Prometheus) SetPushGatewayJob(j string) { |
| p.Ppg.Job = j |
| } |
| |
| func (p *Prometheus) startPushCounter() { |
| if p.Ppg.counter >= p.Ppg.PushIntervalThreshold { |
| go p.sendMetricsToPushGateway(p.getMetrics()) |
| p.Ppg.counter = 0 |
| } |
| } |
| |
| func (p *Prometheus) SetPushGateway() { |
| if p.Ppg.CounterPush { |
| p.startPushCounter() |
| } |
| } |
| |
| func (p *Prometheus) getMetrics() []byte { |
| out := &bytes.Buffer{} |
| metricFamilies, err := prometheus.DefaultGatherer.Gather() |
| if err != nil { |
| logger.Errorf("prometheus.DefaultGatherer.Gather error: %v", err) |
| return []byte{} |
| } |
| for i := range metricFamilies { |
| _, err := expfmt.MetricFamilyToText(out, metricFamilies[i]) |
| if err != nil { |
| logger.Errorf("failed to converts a MetricFamily proto message into text format %v", err) |
| } |
| } |
| return out.Bytes() |
| } |
| |
| func (p *Prometheus) sendMetricsToPushGateway(metrics []byte) { |
| req, err := http.NewRequest(http.MethodPost, p.getPushGatewayURL(), bytes.NewBuffer(metrics)) |
| if err != nil { |
| logger.Errorf("failed to create push gateway request: %v", err) |
| return |
| } |
| if _, err = (&http.Client{}).Do(req); err != nil { |
| logger.Errorf("Error sending to push gateway: %v", err) |
| } |
| } |
| |
| func (p *Prometheus) getPushGatewayURL() string { |
| h, _ := os.Hostname() |
| if p.Ppg.Job == "" { |
| p.Ppg.Job = "pixiu" |
| } |
| return p.Ppg.PushGatewayURL + p.MetricsPath + "/job/" + p.Ppg.Job + "/instance/" + h |
| } |
| |
| // HandlerFunc defines handler function for middleware |
| func (p *Prometheus) HandlerFunc() ContextHandlerFunc { |
| return func(c *contextHttp.HttpContext) error { |
| start := time.Now() |
| reqSz, err1 := computeApproximateRequestSize(c.Request) |
| //fmt.Println("reqSz", reqSz) |
| elapsed := float64(time.Since(start)) / float64(time.Second) |
| //fmt.Println("elapsed ", elapsed) |
| url := p.RequestCounterURLLabelMappingFunc(c) |
| //fmt.Println("url ", url) |
| statusStr := strconv.Itoa(c.GetStatusCode()) |
| //fmt.Println("statusStr", statusStr) |
| method := c.GetMethod() |
| //fmt.Println("method ", method) |
| p.reqDur.WithLabelValues(statusStr, method, url).Observe(elapsed) |
| p.reqCnt.WithLabelValues(statusStr, method, p.RequestCounterHostLabelMappingFunc(c), url).Inc() |
| if err1 == nil { |
| p.reqSz.WithLabelValues(statusStr, method, url).Observe(float64(reqSz)) |
| } |
| resSz, err2 := computeApproximateResponseSize(c.TargetResp) |
| if err2 == nil { |
| p.resSz.WithLabelValues(statusStr, method, url).Observe(float64(resSz)) |
| } |
| p.Ppg.mutex.Lock() |
| p.Ppg.counter = p.Ppg.counter + 1 |
| defer p.Ppg.mutex.Unlock() |
| p.SetPushGateway() |
| return nil |
| } |
| } |
| |
| func computeApproximateRequestSize(r *http.Request) (int, error) { |
| if r == nil { |
| return 0, errors.New("http.Request is null pointer ") |
| } |
| s := 0 |
| if r.URL != nil { |
| s = len(r.URL.Path) |
| } |
| s += len(r.Method) |
| s += len(r.Proto) |
| for name, values := range r.Header { |
| s += len(name) |
| for _, value := range values { |
| s += len(value) |
| } |
| } |
| s += len(r.Host) |
| if r.ContentLength != -1 { |
| s += int(r.ContentLength) |
| } |
| return s, nil |
| } |
| |
| func computeApproximateResponseSize(res *client.Response) (int, error) { |
| if res == nil { |
| return 0, errors.New("client.Response is null pointer ") |
| } |
| return len(res.Data), nil |
| } |