blob: 64e59c91e9679dacac0e98d8f655e94456cda954 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package stackdriver
import (
"fmt"
"io"
"net"
"net/http"
"strings"
"time"
)
import (
"cloud.google.com/go/compute/metadata"
cloudtracepb "google.golang.org/genproto/googleapis/devtools/cloudtrace/v1"
ltype "google.golang.org/genproto/googleapis/logging/type"
loggingpb "google.golang.org/genproto/googleapis/logging/v2"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
kubeApiCore "k8s.io/api/core/v1"
)
import (
istioKube "github.com/apache/dubbo-go-pixiu/pkg/kube"
environ "github.com/apache/dubbo-go-pixiu/pkg/test/env"
"github.com/apache/dubbo-go-pixiu/pkg/test/framework/components/cluster"
"github.com/apache/dubbo-go-pixiu/pkg/test/framework/components/namespace"
"github.com/apache/dubbo-go-pixiu/pkg/test/framework/resource"
testKube "github.com/apache/dubbo-go-pixiu/pkg/test/kube"
"github.com/apache/dubbo-go-pixiu/pkg/test/scopes"
"github.com/apache/dubbo-go-pixiu/pkg/util/protomarshal"
)
type LogType int
const (
stackdriverNamespace = "istio-stackdriver"
stackdriverPort = 8091
)
const (
ServerAccessLog LogType = iota
ServerAuditLog
)
var (
_ Instance = &kubeComponent{}
_ io.Closer = &kubeComponent{}
)
type kubeComponent struct {
id resource.ID
ns namespace.Instance
forwarder istioKube.PortForwarder
cluster cluster.Cluster
address string
}
func newKube(ctx resource.Context, cfg Config) (Instance, error) {
c := &kubeComponent{
cluster: ctx.Clusters().GetOrDefault(cfg.Cluster),
}
c.id = ctx.TrackResource(c)
var err error
scopes.Framework.Info("=== BEGIN: Deploy Stackdriver ===")
defer func() {
if err != nil {
err = fmt.Errorf("stackdriver deployment failed: %v", err)
scopes.Framework.Infof("=== FAILED: Deploy Stackdriver ===")
_ = c.Close()
} else {
scopes.Framework.Info("=== SUCCEEDED: Deploy Stackdriver ===")
}
}()
c.ns, err = namespace.New(ctx, namespace.Config{
Prefix: stackdriverNamespace,
})
if err != nil {
return nil, fmt.Errorf("could not create %s Namespace for Stackdriver install; err:%v", stackdriverNamespace, err)
}
// apply stackdriver YAML
if err := c.cluster.ApplyYAMLFiles(c.ns.Name(), environ.StackdriverInstallFilePath); err != nil {
return nil, fmt.Errorf("failed to apply rendered %s, err: %v", environ.StackdriverInstallFilePath, err)
}
fetchFn := testKube.NewSinglePodFetch(c.cluster, c.ns.Name(), "app=stackdriver")
pods, err := testKube.WaitUntilPodsAreReady(fetchFn)
if err != nil {
return nil, err
}
pod := pods[0]
forwarder, err := c.cluster.NewPortForwarder(pod.Name, pod.Namespace, "", 0, stackdriverPort)
if err != nil {
return nil, err
}
if err := forwarder.Start(); err != nil {
return nil, err
}
c.forwarder = forwarder
scopes.Framework.Debugf("initialized stackdriver port forwarder: %v", forwarder.Address())
var svc *kubeApiCore.Service
if svc, _, err = testKube.WaitUntilServiceEndpointsAreReady(c.cluster, c.ns.Name(), "stackdriver"); err != nil {
scopes.Framework.Infof("Error waiting for Stackdriver service to be available: %v", err)
return nil, err
}
c.address = net.JoinHostPort(pod.Status.HostIP, fmt.Sprint(svc.Spec.Ports[0].NodePort))
scopes.Framework.Infof("Stackdriver address: %s NodeName %s", c.address, pod.Spec.NodeName)
return c, nil
}
func (c *kubeComponent) ListTimeSeries(_ string) ([]*monitoringpb.TimeSeries, error) {
client := http.Client{
Timeout: 5 * time.Second,
}
resp, err := client.Get("http://" + c.forwarder.Address() + "/timeseries")
if err != nil {
return []*monitoringpb.TimeSeries{}, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return []*monitoringpb.TimeSeries{}, err
}
var r monitoringpb.ListTimeSeriesResponse
err = protomarshal.Unmarshal(body, &r)
if err != nil {
return []*monitoringpb.TimeSeries{}, err
}
return trimMetricLabels(&r), nil
}
func (c *kubeComponent) ListLogEntries(lt LogType, _ string) ([]*loggingpb.LogEntry, error) {
client := http.Client{
Timeout: 5 * time.Second,
}
resp, err := client.Get("http://" + c.forwarder.Address() + "/logentries")
if err != nil {
return []*loggingpb.LogEntry{}, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return []*loggingpb.LogEntry{}, err
}
var r loggingpb.ListLogEntriesResponse
err = protomarshal.Unmarshal(body, &r)
if err != nil {
return []*loggingpb.LogEntry{}, err
}
return trimLogLabels(&r, lt), nil
}
func (c *kubeComponent) ListTraces(_ string) ([]*cloudtracepb.Trace, error) {
client := http.Client{
Timeout: 5 * time.Second,
}
resp, err := client.Get("http://" + c.forwarder.Address() + "/traces")
if err != nil {
return []*cloudtracepb.Trace{}, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return []*cloudtracepb.Trace{}, err
}
var traceResp cloudtracepb.ListTracesResponse
err = protomarshal.Unmarshal(body, &traceResp)
if err != nil {
return []*cloudtracepb.Trace{}, err
}
return traceResp.Traces, nil
}
func (c *kubeComponent) ID() resource.ID {
return c.id
}
// Close implements io.Closer.
func (c *kubeComponent) Close() error {
return nil
}
func (c *kubeComponent) GetStackdriverNamespace() string {
return c.ns.Name()
}
func (c *kubeComponent) Address() string {
return c.address
}
func trimMetricLabels(r *monitoringpb.ListTimeSeriesResponse) []*monitoringpb.TimeSeries {
var ret []*monitoringpb.TimeSeries
for _, t := range r.TimeSeries {
if t == nil {
continue
}
t.Points = nil
if metadata.OnGCE() {
// If the test runs on GCE, only remove MR fields that do not need verification
delete(t.Resource.Labels, "cluster_name")
delete(t.Resource.Labels, "location")
delete(t.Resource.Labels, "project_id")
delete(t.Resource.Labels, "pod_name")
} else {
// Otherwise remove the whole MR since it is not correctly filled on other platform yet.
t.Resource = nil
}
ret = append(ret, t)
t.Metadata = nil
}
return ret
}
func trimLogLabels(r *loggingpb.ListLogEntriesResponse, filter LogType) []*loggingpb.LogEntry {
logNameFilter := logNameSuffix(filter)
var ret []*loggingpb.LogEntry
for _, l := range r.Entries {
if l == nil {
continue
}
if !strings.HasSuffix(l.LogName, logNameFilter) {
continue
}
// Remove fields that do not need verification
l.Timestamp = nil
l.Trace = ""
l.SpanId = ""
l.LogName = ""
l.Severity = ltype.LogSeverity_DEFAULT
if l.HttpRequest != nil {
l.HttpRequest.ResponseSize = 0
l.HttpRequest.RequestSize = 0
l.HttpRequest.ServerIp = ""
l.HttpRequest.RemoteIp = ""
l.HttpRequest.UserAgent = ""
l.HttpRequest.Latency = nil
}
delete(l.Labels, "request_id")
delete(l.Labels, "source_name")
delete(l.Labels, "destination_ip")
delete(l.Labels, "destination_name")
delete(l.Labels, "connection_id")
delete(l.Labels, "upstream_host")
delete(l.Labels, "connection_state")
delete(l.Labels, "source_ip")
delete(l.Labels, "source_port")
delete(l.Labels, "total_sent_bytes")
delete(l.Labels, "total_received_bytes")
ret = append(ret, l)
}
return ret
}
func logNameSuffix(filter LogType) string {
switch filter {
case ServerAuditLog:
return "server-istio-audit-log"
case ServerAccessLog:
return "server-accesslog-stackdriver"
}
return ""
}