blob: deb8784945b215f0e6cf9060c7590566fa479514 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build integ
// +build integ
package stackdriver
import (
"encoding/json"
"fmt"
"net/http"
"os"
"path/filepath"
"sort"
"strings"
)
import (
"cloud.google.com/go/compute/metadata"
"google.golang.org/genproto/googleapis/devtools/cloudtrace/v1"
loggingpb "google.golang.org/genproto/googleapis/logging/v2"
monitoring "google.golang.org/genproto/googleapis/monitoring/v3"
"google.golang.org/protobuf/proto"
)
import (
"github.com/apache/dubbo-go-pixiu/pkg/config/protocol"
"github.com/apache/dubbo-go-pixiu/pkg/test"
"github.com/apache/dubbo-go-pixiu/pkg/test/env"
"github.com/apache/dubbo-go-pixiu/pkg/test/framework"
"github.com/apache/dubbo-go-pixiu/pkg/test/framework/components/echo"
"github.com/apache/dubbo-go-pixiu/pkg/test/framework/components/echo/deployment"
"github.com/apache/dubbo-go-pixiu/pkg/test/framework/components/echo/match"
"github.com/apache/dubbo-go-pixiu/pkg/test/framework/components/gcemetadata"
"github.com/apache/dubbo-go-pixiu/pkg/test/framework/components/istio"
"github.com/apache/dubbo-go-pixiu/pkg/test/framework/components/namespace"
"github.com/apache/dubbo-go-pixiu/pkg/test/framework/components/stackdriver"
"github.com/apache/dubbo-go-pixiu/pkg/test/framework/resource"
"github.com/apache/dubbo-go-pixiu/pkg/test/scopes"
"github.com/apache/dubbo-go-pixiu/pkg/test/util/tmpl"
"github.com/apache/dubbo-go-pixiu/pkg/util/protomarshal"
"github.com/apache/dubbo-go-pixiu/pkg/util/sets"
"github.com/apache/dubbo-go-pixiu/tests/integration/telemetry"
)
const (
stackdriverBootstrapOverride = "tests/integration/telemetry/stackdriver/testdata/custom_bootstrap.yaml.tmpl"
serverRequestCount = "tests/integration/telemetry/stackdriver/testdata/server_request_count.json.tmpl"
clientRequestCount = "tests/integration/telemetry/stackdriver/testdata/client_request_count.json.tmpl"
serverLogEntry = "tests/integration/telemetry/stackdriver/testdata/server_access_log.json.tmpl"
sdBootstrapConfigMap = "stackdriver-bootstrap-config"
FakeGCEMetadataServerValues = `
defaultConfig:
proxyMetadata:
GCE_METADATA_HOST: `
)
var (
Ist istio.Instance
EchoNsInst namespace.Instance
GCEInst gcemetadata.Instance
SDInst stackdriver.Instance
Srv echo.Instances
Clt echo.Instances
)
func TestSetup(ctx resource.Context) (err error) {
EchoNsInst, err = namespace.New(ctx, namespace.Config{
Prefix: "istio-echo",
Inject: true,
})
if err != nil {
return
}
SDInst, err = stackdriver.New(ctx, stackdriver.Config{})
if err != nil {
return
}
err = ctx.ConfigKube().EvalFile(EchoNsInst.Name(), map[string]interface{}{
"StackdriverAddress": SDInst.Address(),
"EchoNamespace": EchoNsInst.Name(),
"UseRealSD": stackdriver.UseRealStackdriver(),
}, filepath.Join(env.IstioSrc, stackdriverBootstrapOverride)).Apply()
if err != nil {
return
}
builder := deployment.New(ctx)
for _, cls := range ctx.Clusters() {
clName := cls.Name()
builder.
WithConfig(echo.Config{
Service: fmt.Sprintf("clt-%s", clName),
Cluster: cls,
Namespace: EchoNsInst,
Subsets: []echo.SubsetConfig{
{
Annotations: map[echo.Annotation]*echo.AnnotationValue{
echo.SidecarBootstrapOverride: {
Value: sdBootstrapConfigMap,
},
},
},
},
}).
WithConfig(echo.Config{
Service: "srv",
Cluster: cls,
Namespace: EchoNsInst,
Ports: []echo.Port{
{
Name: "grpc",
Protocol: protocol.GRPC,
// We use a port > 1024 to not require root
WorkloadPort: 7070,
},
{
Name: "http",
Protocol: protocol.HTTP,
// We use a port > 1024 to not require root
WorkloadPort: 8888,
},
{
Name: "tcp",
Protocol: protocol.TCP,
// We use a port > 1024 to not require root
WorkloadPort: 9000,
},
},
Subsets: []echo.SubsetConfig{
{
Annotations: map[echo.Annotation]*echo.AnnotationValue{
echo.SidecarBootstrapOverride: {
Value: sdBootstrapConfigMap,
},
},
},
},
})
}
echos, err := builder.Build()
if err != nil {
return
}
servicePrefix := func(prefix string) match.Matcher {
return func(i echo.Instance) bool {
return strings.HasPrefix(i.Config().Service, prefix)
}
}
Clt = servicePrefix("clt").GetMatches(echos)
Srv = match.ServiceName(echo.NamespacedName{Name: "srv", Namespace: EchoNsInst}).GetMatches(echos)
return nil
}
// send both a grpc and http requests (http with forced tracing).
func SendTraffic(cltInstance echo.Instance, headers http.Header, onlyTCP bool) error {
callCount := telemetry.RequestCountMultipler * Srv.MustWorkloads().Len()
// All server instance have same names, so setting target as srv[0].
// Sending the number of total request same as number of servers, so that load balancing gets a chance to send request to all the clusters.
if onlyTCP {
_, err := cltInstance.Call(echo.CallOptions{
To: Srv,
Port: echo.Port{
Name: "tcp",
},
Count: callCount,
Retry: echo.Retry{
NoRetry: true,
},
})
return err
}
grpcOpts := echo.CallOptions{
To: Srv,
Port: echo.Port{
Name: "grpc",
},
Count: callCount,
Retry: echo.Retry{
NoRetry: true,
},
}
// an HTTP request with forced tracing
httpOpts := echo.CallOptions{
To: Srv,
Port: echo.Port{
Name: "http",
},
HTTP: echo.HTTP{
Headers: headers,
},
Count: callCount,
Retry: echo.Retry{
NoRetry: true,
},
}
if _, err := cltInstance.Call(grpcOpts); err != nil {
return err
}
if _, err := cltInstance.Call(httpOpts); err != nil {
return err
}
return nil
}
func ValidateMetrics(t framework.TestContext, serverReqCount, clientReqCount, clName, trustDomain string) error {
t.Helper()
var wantClient, wantServer monitoring.TimeSeries
if err := unmarshalFromTemplateFile(serverReqCount, &wantServer, clName, trustDomain); err != nil {
return fmt.Errorf("metrics: error generating wanted server request: %v", err)
}
if err := unmarshalFromTemplateFile(clientReqCount, &wantClient, clName, trustDomain); err != nil {
return fmt.Errorf("metrics: error generating wanted client request: %v", err)
}
// Traverse all time series received and compare with expected client and server time series.
ts, err := SDInst.ListTimeSeries(EchoNsInst.Name())
if err != nil {
return fmt.Errorf("metrics: error getting time-series from Stackdriver: %v", err)
}
t.Logf("number of timeseries: %v", len(ts))
var gotServer, gotClient bool
for _, tt := range ts {
if tt == nil {
continue
}
if tt.Metric.Type != wantClient.Metric.Type && tt.Metric.Type != wantServer.Metric.Type {
continue
}
if proto.Equal(tt, &wantServer) {
gotServer = true
}
if proto.Equal(tt, &wantClient) {
gotClient = true
}
}
if !gotServer {
LogMetricsDiff(t, &wantServer, ts)
return fmt.Errorf("metrics: did not get expected metrics for cluster %s", clName)
}
if !gotClient {
LogMetricsDiff(t, &wantClient, ts)
return fmt.Errorf("metrics: did not get expected metrics for cluster %s", clName)
}
return nil
}
func unmarshalFromTemplateFile(file string, out proto.Message, clName, trustDomain string) error {
templateFile, err := os.ReadFile(file)
if err != nil {
return err
}
resource, err := tmpl.Evaluate(string(templateFile), map[string]interface{}{
"EchoNamespace": EchoNsInst.Name(),
"ClusterName": clName,
"TrustDomain": trustDomain,
"OnGCE": metadata.OnGCE(),
})
if err != nil {
return err
}
return protomarshal.Unmarshal([]byte(resource), out)
}
func ConditionallySetupMetadataServer(ctx resource.Context) (err error) {
// TODO: this looks at the machine the node is running on. This would not work if the host and test
// cluster differ.
if !metadata.OnGCE() {
scopes.Framework.Infof("Not on GCE, setup fake GCE metadata server")
if GCEInst, err = gcemetadata.New(ctx, gcemetadata.Config{}); err != nil {
return
}
} else {
scopes.Framework.Infof("On GCE, setup fake GCE metadata server")
}
return nil
}
func ValidateLogs(t test.Failer, srvLogEntry, clName, trustDomain string, filter stackdriver.LogType) error {
var wantLog loggingpb.LogEntry
if err := unmarshalFromTemplateFile(srvLogEntry, &wantLog, clName, trustDomain); err != nil {
return fmt.Errorf("logs: failed to parse wanted log entry: %v", err)
}
return ValidateLogEntry(t, &wantLog, filter)
}
func ValidateLogEntry(t test.Failer, want *loggingpb.LogEntry, filter stackdriver.LogType) error {
// Traverse all log entries received and compare with expected server log entry.
entries, err := SDInst.ListLogEntries(filter, EchoNsInst.Name())
if err != nil {
return fmt.Errorf("logs: failed to get received log entries: %v", err)
}
for _, l := range entries {
l.Trace = ""
l.SpanId = ""
if proto.Equal(l, want) {
return nil
}
}
LogAccessLogsDiff(t, want, entries)
return fmt.Errorf("logs: did not get expected log entry")
}
func LogAccessLogsDiff(t test.Failer, wantRaw *loggingpb.LogEntry, entries []*loggingpb.LogEntry) {
query := normalizeLogs(wantRaw)
existing := []map[string]string{}
for _, e := range entries {
existing = append(existing, normalizeLogs(e))
}
logDiff(t, "access log", query, existing)
}
func LogTraceDiff(t test.Failer, wantRaw *cloudtrace.Trace, entries []*cloudtrace.Trace) {
query := normalizeTrace(wantRaw)
existing := []map[string]string{}
for _, e := range entries {
existing = append(existing, normalizeTrace(e))
}
logDiff(t, "trace", query, existing)
}
func LogMetricsDiff(t test.Failer, wantRaw *monitoring.TimeSeries, entries []*monitoring.TimeSeries) {
query := normalizeMetrics(wantRaw)
existing := []map[string]string{}
for _, e := range entries {
existing = append(existing, normalizeMetrics(e))
}
logDiff(t, "metrics", query, existing)
}
func logDiff(t test.Failer, tp string, query map[string]string, entries []map[string]string) {
if len(entries) == 0 {
t.Logf("no %v entries found", tp)
return
}
allMismatches := []map[string]string{}
seen := sets.New()
for _, s := range entries {
b, _ := json.Marshal(s)
ss := string(b)
if seen.Contains(ss) {
continue
}
seen.Insert(ss)
misMatched := map[string]string{}
for k, want := range query {
got := s[k]
if want != got {
misMatched[k] = got
}
}
if len(misMatched) == 0 {
continue
}
allMismatches = append(allMismatches, misMatched)
}
if len(allMismatches) == 0 {
t.Log("no diff found")
return
}
t.Logf("query for %s returned %d entries (%d distinct), but none matched our query exactly.", tp, len(entries), len(seen))
sort.Slice(allMismatches, func(i, j int) bool {
return len(allMismatches[i]) < len(allMismatches[j])
})
for i, m := range allMismatches {
t.Logf("Entry %d)", i)
missing := []string{}
for k, v := range m {
if v == "" {
missing = append(missing, k)
} else {
t.Logf(" for label %q, wanted %q but got %q", k, query[k], v)
}
}
if len(missing) > 0 {
t.Logf(" missing labels: %v", missing)
}
}
}
func normalizeLogs(l *loggingpb.LogEntry) map[string]string {
r := map[string]string{}
if l.HttpRequest != nil {
r["http.RequestMethod"] = l.HttpRequest.RequestMethod
r["http.RequestUrl"] = l.HttpRequest.RequestUrl
r["http.RequestSize"] = fmt.Sprint(l.HttpRequest.RequestSize)
r["http.Status"] = fmt.Sprint(l.HttpRequest.Status)
r["http.ResponseSize"] = fmt.Sprint(l.HttpRequest.ResponseSize)
r["http.UserAgent"] = l.HttpRequest.UserAgent
r["http.RemoteIp"] = l.HttpRequest.RemoteIp
r["http.ServerIp"] = l.HttpRequest.ServerIp
r["http.Referer"] = l.HttpRequest.Referer
r["http.Latency"] = fmt.Sprint(l.HttpRequest.Latency)
r["http.CacheLookup"] = fmt.Sprint(l.HttpRequest.CacheLookup)
r["http.CacheHit"] = fmt.Sprint(l.HttpRequest.CacheHit)
r["http.CacheValidatedWithOriginServer"] = fmt.Sprint(l.HttpRequest.CacheValidatedWithOriginServer)
r["http.CacheFillBytes"] = fmt.Sprint(l.HttpRequest.CacheFillBytes)
r["http.Protocol"] = l.HttpRequest.Protocol
}
for k, v := range l.Labels {
r["labels."+k] = v
}
r["traceSampled"] = fmt.Sprint(l.TraceSampled)
return r
}
func normalizeMetrics(l *monitoring.TimeSeries) map[string]string {
r := map[string]string{}
for k, v := range l.Metric.Labels {
r["metric.labels."+k] = v
}
r["metric.type"] = l.Metric.Type
return r
}
func normalizeTrace(l *cloudtrace.Trace) map[string]string {
r := map[string]string{}
r["projectId"] = l.ProjectId
for i, s := range l.Spans {
for k, v := range s.Labels {
r[fmt.Sprintf("span[%d-%s].%s", i, s.Name, k)] = v
}
}
return r
}