blob: 193d3f9ef3c88e28e9dd4f70c80cff2d21471726 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package prometheus
import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"sort"
"strings"
"time"
)
import (
prometheusApi "github.com/prometheus/client_golang/api"
prometheusApiV1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
kubeApiMeta "k8s.io/apimachinery/pkg/apis/meta/v1"
)
import (
istioKube "github.com/apache/dubbo-go-pixiu/pkg/kube"
"github.com/apache/dubbo-go-pixiu/pkg/test/env"
"github.com/apache/dubbo-go-pixiu/pkg/test/framework/components/cluster"
"github.com/apache/dubbo-go-pixiu/pkg/test/framework/components/istio"
"github.com/apache/dubbo-go-pixiu/pkg/test/framework/resource"
testKube "github.com/apache/dubbo-go-pixiu/pkg/test/kube"
"github.com/apache/dubbo-go-pixiu/pkg/test/scopes"
)
const (
serviceName = "prometheus"
appName = "prometheus"
)
var (
_ Instance = &kubeComponent{}
_ io.Closer = &kubeComponent{}
)
type kubeComponent struct {
id resource.ID
api map[string]prometheusApiV1.API
forwarder map[string]istioKube.PortForwarder
clusters cluster.Clusters
}
func getPrometheusYaml() (string, error) {
yamlBytes, err := os.ReadFile(filepath.Join(env.IstioSrc, "samples/addons/prometheus.yaml"))
if err != nil {
return "", err
}
yaml := string(yamlBytes)
// For faster tests, drop scrape interval
yaml = strings.ReplaceAll(yaml, "scrape_interval: 15s", "scrape_interval: 5s")
yaml = strings.ReplaceAll(yaml, "scrape_timeout: 10s", "scrape_timeout: 5s")
return yaml, nil
}
func installPrometheus(ctx resource.Context, ns string) error {
yaml, err := getPrometheusYaml()
if err != nil {
return err
}
if err := ctx.ConfigKube().YAML(ns, yaml).Apply(resource.NoCleanup); err != nil {
return err
}
ctx.ConditionalCleanup(func() {
_ = ctx.ConfigKube().YAML(ns, yaml).Delete()
})
return nil
}
func newKube(ctx resource.Context, cfgIn Config) (Instance, error) {
c := &kubeComponent{
clusters: ctx.Clusters(),
}
c.id = ctx.TrackResource(c)
c.api = make(map[string]prometheusApiV1.API)
c.forwarder = make(map[string]istioKube.PortForwarder)
cfg, err := istio.DefaultConfig(ctx)
if err != nil {
return nil, err
}
if !cfgIn.SkipDeploy {
if err := installPrometheus(ctx, cfg.TelemetryNamespace); err != nil {
return nil, err
}
}
for _, cls := range ctx.Clusters().Kube() {
scopes.Framework.Debugf("Installing Prometheus on cluster: %s", cls.Name())
// Find the Prometheus pod and service, and start forwarding a local port.
fetchFn := testKube.NewSinglePodFetch(cls, cfg.TelemetryNamespace, fmt.Sprintf("app=%s", appName))
pods, err := testKube.WaitUntilPodsAreReady(fetchFn)
if err != nil {
return nil, err
}
pod := pods[0]
svc, err := cls.CoreV1().Services(cfg.TelemetryNamespace).Get(context.TODO(), serviceName, kubeApiMeta.GetOptions{})
if err != nil {
return nil, err
}
port := uint16(svc.Spec.Ports[0].Port)
forwarder, err := cls.NewPortForwarder(pod.Name, pod.Namespace, "", 0, int(port))
if err != nil {
return nil, err
}
if err := forwarder.Start(); err != nil {
return nil, err
}
c.forwarder[cls.Name()] = forwarder
scopes.Framework.Debugf("initialized Prometheus port forwarder: %v", forwarder.Address())
address := fmt.Sprintf("http://%s", forwarder.Address())
var client prometheusApi.Client
client, err = prometheusApi.NewClient(prometheusApi.Config{Address: address})
if err != nil {
return nil, err
}
c.api[cls.Name()] = prometheusApiV1.NewAPI(client)
}
return c, nil
}
func (c *kubeComponent) ID() resource.ID {
return c.id
}
// API implements environment.DeployedPrometheus.
func (c *kubeComponent) API() prometheusApiV1.API {
return c.api[c.clusters.Default().Name()]
}
func (c *kubeComponent) APIForCluster(cluster cluster.Cluster) prometheusApiV1.API {
return c.api[cluster.Name()]
}
func (c *kubeComponent) Query(cluster cluster.Cluster, query Query) (model.Value, error) {
scopes.Framework.Debugf("Query running: %q", query)
v, _, err := c.api[cluster.Name()].Query(context.Background(), query.String(), time.Now())
if err != nil {
return nil, fmt.Errorf("error querying Prometheus: %v", err)
}
scopes.Framework.Debugf("Query received: %v", v)
switch v.Type() {
case model.ValScalar, model.ValString:
return v, nil
case model.ValVector:
value := v.(model.Vector)
if len(value) == 0 {
return nil, fmt.Errorf("value not found (query: %v)", query)
}
return v, nil
default:
return nil, fmt.Errorf("unhandled value type: %v", v.Type())
}
}
func (c *kubeComponent) QuerySum(cluster cluster.Cluster, query Query) (float64, error) {
val, err := c.Query(cluster, query)
if err != nil {
return 0, err
}
got, err := Sum(val)
if err != nil {
return 0, fmt.Errorf("could not find metric value: %v", err)
}
return got, nil
}
func Sum(val model.Value) (float64, error) {
if val.Type() != model.ValVector {
return 0, fmt.Errorf("value not a model.Vector; was %s", val.Type().String())
}
value := val.(model.Vector)
valueCount := 0.0
for _, sample := range value {
valueCount += float64(sample.Value)
}
if valueCount > 0.0 {
return valueCount, nil
}
return 0, fmt.Errorf("value not found")
}
// Close implements io.Closer.
func (c *kubeComponent) Close() error {
for _, forwarder := range c.forwarder {
forwarder.Close()
}
return nil
}
type Query struct {
Metric string
Aggregation string
Labels map[string]string
}
func (q Query) String() string {
query := q.Metric + `{`
keys := []string{}
for k := range q.Labels {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
v := q.Labels[k]
query += fmt.Sprintf(`%s=%q,`, k, v)
}
query += "}"
if q.Aggregation != "" {
query = fmt.Sprintf(`%s(%s)`, q.Aggregation, query)
}
return query
}