| // Licensed to Apache Software Foundation (ASF) under one or more contributor |
| // license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright |
| // ownership. Apache Software Foundation (ASF) licenses this file to you under |
| // the Apache License, Version 2.0 (the "License"); you may |
| // not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package provider |
| |
| import ( |
| "context" |
| "encoding/hex" |
| "flag" |
| "fmt" |
| "strconv" |
| "strings" |
| "sync" |
| "time" |
| |
| swctlapi "github.com/apache/skywalking-cli/api" |
| "github.com/apache/skywalking-cli/pkg/graphql/metrics" |
| "github.com/urfave/cli" |
| apierr "k8s.io/apimachinery/pkg/api/errors" |
| "k8s.io/apimachinery/pkg/api/resource" |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| "k8s.io/apimachinery/pkg/labels" |
| apischema "k8s.io/apimachinery/pkg/runtime/schema" |
| "k8s.io/klog/v2" |
| "k8s.io/metrics/pkg/apis/external_metrics" |
| apiprovider "sigs.k8s.io/custom-metrics-apiserver/pkg/provider" |
| ) |
| |
| const labelValueTypeStr string = "str" |
| const labelValueTypeByte string = "byte" |
| const stepMinute string = "2006-01-02 1504" |
| |
| var ( |
| NsGroupResource = apischema.GroupResource{Resource: "namespaces"} |
| ) |
| |
| // externalMetricsProvider is a implementation of provider.MetricsProvider which provides metrics from OAP |
| type externalMetricsProvider struct { |
| metricDefines []*swctlapi.MetricDefinition |
| lock sync.RWMutex |
| ctx *cli.Context |
| regex string |
| refreshRegistryInterval time.Duration |
| namespace string |
| } |
| |
| type stringValue string |
| |
| func (sv *stringValue) String() string { |
| return string(*sv) |
| } |
| |
| func (sv *stringValue) Set(s string) error { |
| *sv = stringValue(s) |
| return nil |
| } |
| |
| // NewProvider returns an instance of externalMetricsProvider |
| func NewProvider(baseURL string, metricRegex string, refreshRegistryInterval time.Duration, namespace string) (apiprovider.ExternalMetricsProvider, error) { |
| fs := flag.NewFlagSet("mock", flag.ContinueOnError) |
| var k stringValue |
| if err := k.Set(baseURL); err != nil { |
| return nil, fmt.Errorf("failed to set OAP address: %v", err) |
| } |
| fs.Var(&k, "base-url", "") |
| ctx := cli.NewContext(nil, fs, nil) |
| if url := ctx.GlobalString("base-url"); url != k.String() { |
| return nil, fmt.Errorf("failed to set base-url: %s", url) |
| } |
| provider := &externalMetricsProvider{ |
| ctx: ctx, |
| regex: metricRegex, |
| refreshRegistryInterval: refreshRegistryInterval, |
| namespace: namespace, |
| } |
| provider.sync() |
| |
| return provider, nil |
| } |
| |
| type paramValue struct { |
| key string |
| val *string |
| } |
| |
| func (pv *paramValue) extractValue(requirements labels.Requirements) error { |
| vv := make([]string, 10) |
| for _, r := range requirements { |
| if !strings.HasPrefix(r.Key(), pv.key) { |
| continue |
| } |
| kk := strings.Split(r.Key(), ".") |
| if len(kk) == 1 { |
| vv, _ = bufferEntity(vv, 0, r, nil) |
| pv.val = &vv[0] |
| return nil |
| } else if len(kk) < 3 { |
| return fmt.Errorf("invalid label key:%s", r.Key()) |
| } |
| i, err := strconv.ParseInt(kk[2], 10, 8) |
| if err != nil { |
| return fmt.Errorf("failed to parse index string to int: %v", err) |
| } |
| |
| index := int(i) |
| switch kk[1] { |
| case labelValueTypeStr: |
| vv, _ = bufferEntity(vv, index, r, nil) |
| case labelValueTypeByte: |
| vv, err = bufferEntity(vv, index, r, func(encoded string) (string, error) { |
| var bytes []byte |
| bytes, err = hex.DecodeString(encoded) |
| if err != nil { |
| return "", fmt.Errorf("failed to decode hex to string: %v", err) |
| } |
| return string(bytes), nil |
| }) |
| if err != nil { |
| return err |
| } |
| } |
| } |
| val := strings.Join(vv, "") |
| pv.val = &val |
| return nil |
| } |
| |
| type decoder func(string) (string, error) |
| |
| func bufferEntity(buff []string, index int, requirement labels.Requirement, dec decoder) ([]string, error) { |
| if cap(buff) <= index { |
| old := buff |
| buff = make([]string, index+1) |
| copy(buff, old) |
| } |
| if v, exist := requirement.Values().PopAny(); exist { |
| if dec != nil { |
| var err error |
| buff[index], err = dec(v) |
| if err != nil { |
| return nil, err |
| } |
| } else { |
| buff[index] = v |
| } |
| } |
| return buff, nil |
| } |
| |
| func (p *externalMetricsProvider) GetExternalMetric(_ context.Context, namespace string, metricSelector labels.Selector, |
| info apiprovider.ExternalMetricInfo) (*external_metrics.ExternalMetricValueList, error) { |
| var md *swctlapi.MetricDefinition |
| for _, m := range p.metricDefines { |
| if p.getMetricNameWithNamespace(m.Name) == info.Metric { |
| md = m |
| } |
| } |
| if md == nil { |
| klog.Errorf("%s is missing in OAP", info.Metric) |
| return nil, apierr.NewBadRequest(fmt.Sprintf("%s is defined in OAP", info.Metric)) |
| } |
| requirement, selector := metricSelector.Requirements() |
| if !selector { |
| klog.Errorf("no selector for metric: %s", md.Name) |
| return nil, apierr.NewBadRequest(fmt.Sprintf("no selector for metric: %s", md.Name)) |
| } |
| svc := ¶mValue{key: "service"} |
| label := ¶mValue{key: "label"} |
| instance := ¶mValue{key: "instance"} |
| endpoint := ¶mValue{key: "endpoint"} |
| extractValue(requirement, svc, label, instance, endpoint) |
| if *svc.val == "" { |
| klog.Errorf("%s is lack of required label 'service'", md.Name) |
| return nil, apierr.NewBadRequest(fmt.Sprintf("%s is lack of required label 'service'", md.Name)) |
| } |
| |
| now := time.Now() |
| startTime := now.Add(-3 * time.Minute) |
| endTime := now |
| step := swctlapi.StepMinute |
| duration := swctlapi.Duration{ |
| Start: startTime.Format(stepMinute), |
| End: endTime.Format(stepMinute), |
| Step: step, |
| } |
| |
| normal := true |
| empty := "" |
| entity := &swctlapi.Entity{ |
| ServiceName: svc.val, |
| ServiceInstanceName: instance.val, |
| EndpointName: endpoint.val, |
| Normal: &normal, |
| DestServiceName: &empty, |
| DestNormal: &normal, |
| DestServiceInstanceName: &empty, |
| DestEndpointName: &empty, |
| } |
| entity.Scope = parseScope(entity) |
| condition := swctlapi.MetricsCondition{ |
| Name: md.Name, |
| Entity: entity, |
| } |
| var metricsValues swctlapi.MetricsValues |
| if md.Type == swctlapi.MetricsTypeRegularValue { |
| var err error |
| metricsValues, err = metrics.LinearIntValues(p.ctx, condition, duration) |
| if err != nil { |
| return nil, apierr.NewInternalError(fmt.Errorf("unable to fetch metrics: %v", err)) |
| } |
| klog.V(4).Infof("Linear request{condition:%s, duration:%s} response %s", display(condition), display(duration), display(metricsValues)) |
| } else if md.Type == swctlapi.MetricsTypeLabeledValue { |
| if *label.val == "" { |
| klog.Errorf("%s is lack of required label 'label'", md.Name) |
| return nil, apierr.NewBadRequest(fmt.Sprintf("%s is lack of required label 'label'", md.Name)) |
| } |
| result, err := metrics.MultipleLinearIntValues(p.ctx, condition, []string{*label.val}, duration) |
| if err != nil { |
| return nil, apierr.NewInternalError(fmt.Errorf("unable to fetch metrics: %v", err)) |
| } |
| |
| klog.V(4).Infof("Labeled request{condition:%s, duration:%s, labels:%s} response %s", |
| display(condition), display(duration), *label.val, display(result)) |
| |
| for _, r := range result { |
| if *r.Label == *label.val { |
| metricsValues = r |
| } |
| } |
| } |
| if len(metricsValues.Values.Values) < 1 { |
| return nil, apiprovider.NewMetricNotFoundError(p.selectGroupResource(namespace), info.Metric) |
| } |
| var sTime time.Time |
| var sValue int64 |
| l := len(metricsValues.Values.Values) |
| if l < 2 { |
| return nil, apiprovider.NewMetricNotFoundError(p.selectGroupResource(namespace), info.Metric) |
| } |
| kv := metricsValues.Values.Values[l-2] |
| sTime = endTime.Add(time.Minute * time.Duration(-1)) |
| if kv.Value > 0 { |
| sValue = kv.Value |
| } |
| if sValue == 0 { |
| sTime = endTime |
| } |
| klog.V(4).Infof("metric value: %d, timestamp: %s", sValue, sTime.Format(stepMinute)) |
| |
| return &external_metrics.ExternalMetricValueList{ |
| Items: []external_metrics.ExternalMetricValue{ |
| { |
| MetricName: info.Metric, |
| Timestamp: metav1.Time{ |
| Time: sTime, |
| }, |
| Value: *resource.NewQuantity(sValue, resource.DecimalSI), |
| }, |
| }, |
| }, nil |
| } |
| |
| func extractValue(requirement labels.Requirements, paramValues ...*paramValue) { |
| for _, pv := range paramValues { |
| err := pv.extractValue(requirement) |
| if err != nil { |
| klog.Errorf("failed to parse label %s: %v ", pv.key, err) |
| } |
| } |
| } |
| |
| func (p *externalMetricsProvider) selectGroupResource(namespace string) apischema.GroupResource { |
| if namespace == "default" { |
| return NsGroupResource |
| } |
| |
| return apischema.GroupResource{ |
| Group: "", |
| Resource: "", |
| } |
| } |
| |
| func (p *externalMetricsProvider) getMetricNameWithNamespace(metricName string) string { |
| return strings.Join([]string{p.namespace, metricName}, "|") |
| } |
| |
| // TODO: remove this function once cli move it from internal module to pkg |
| func parseScope(entity *swctlapi.Entity) swctlapi.Scope { |
| scope := swctlapi.ScopeAll |
| |
| if *entity.DestEndpointName != "" { |
| scope = swctlapi.ScopeEndpointRelation |
| } else if *entity.DestServiceInstanceName != "" { |
| scope = swctlapi.ScopeServiceInstanceRelation |
| } else if *entity.DestServiceName != "" { |
| scope = swctlapi.ScopeServiceRelation |
| } else if *entity.EndpointName != "" { |
| scope = swctlapi.ScopeEndpoint |
| } else if *entity.ServiceInstanceName != "" { |
| scope = swctlapi.ScopeServiceInstance |
| } else if *entity.ServiceName != "" { |
| scope = swctlapi.ScopeService |
| } |
| |
| return scope |
| } |