blob: 77add3da327ea18c4d0f95e8ddaa36358256741c [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kube
import (
"context"
"errors"
"fmt"
"strings"
"time"
)
import (
envoyAdmin "github.com/envoyproxy/go-control-plane/envoy/admin/v3"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"google.golang.org/protobuf/proto"
kubeCore "k8s.io/api/core/v1"
)
import (
_ "github.com/apache/dubbo-go-pixiu/pkg/config/xds"
"github.com/apache/dubbo-go-pixiu/pkg/test"
"github.com/apache/dubbo-go-pixiu/pkg/test/framework/components/cluster"
"github.com/apache/dubbo-go-pixiu/pkg/test/framework/components/echo"
"github.com/apache/dubbo-go-pixiu/pkg/test/util/retry"
"github.com/apache/dubbo-go-pixiu/pkg/util/protomarshal"
)
const (
proxyContainerName = "istio-proxy"
// DefaultTimeout the default timeout for the entire retry operation
defaultConfigTimeout = time.Second * 30
// DefaultDelay the default delay between successive retry attempts
defaultConfigDelay = time.Millisecond * 100
)
var _ echo.Sidecar = &sidecar{}
type sidecar struct {
podNamespace string
podName string
cluster cluster.Cluster
}
func newSidecar(pod kubeCore.Pod, cluster cluster.Cluster) *sidecar {
sidecar := &sidecar{
podNamespace: pod.Namespace,
podName: pod.Name,
cluster: cluster,
}
return sidecar
}
func (s *sidecar) Info() (*envoyAdmin.ServerInfo, error) {
msg := &envoyAdmin.ServerInfo{}
if err := s.adminRequest("server_info", msg); err != nil {
return nil, err
}
return msg, nil
}
func (s *sidecar) InfoOrFail(t test.Failer) *envoyAdmin.ServerInfo {
t.Helper()
info, err := s.Info()
if err != nil {
t.Fatal(err)
}
return info
}
func (s *sidecar) Config() (*envoyAdmin.ConfigDump, error) {
msg := &envoyAdmin.ConfigDump{}
if err := s.adminRequest("config_dump", msg); err != nil {
return nil, err
}
return msg, nil
}
func (s *sidecar) ConfigOrFail(t test.Failer) *envoyAdmin.ConfigDump {
t.Helper()
cfg, err := s.Config()
if err != nil {
t.Fatal(err)
}
return cfg
}
func (s *sidecar) WaitForConfig(accept func(*envoyAdmin.ConfigDump) (bool, error), options ...retry.Option) error {
options = append([]retry.Option{retry.BackoffDelay(defaultConfigDelay), retry.Timeout(defaultConfigTimeout)}, options...)
var cfg *envoyAdmin.ConfigDump
_, err := retry.UntilComplete(func() (result interface{}, completed bool, err error) {
cfg, err = s.Config()
if err != nil {
if strings.Contains(err.Error(), "could not resolve Any message type") {
// Unable to parse an Any in the message, likely due to missing imports.
// This is not a recoverable error.
return nil, true, nil
}
if strings.Contains(err.Error(), `Any JSON doesn't have '@type'`) {
// Unable to parse an Any in the message, likely due to an older version.
// This is not a recoverable error.
return nil, true, nil
}
return nil, false, err
}
accepted, err := accept(cfg)
if err != nil {
// Accept returned an error - retry.
return nil, false, err
}
if accepted {
// The configuration was accepted.
return nil, true, nil
}
// The configuration was rejected, don't try again.
return nil, true, errors.New("envoy config rejected")
}, options...)
if err != nil {
configDumpStr := "nil"
if cfg != nil {
b, err := protomarshal.MarshalIndent(cfg, " ")
if err == nil {
configDumpStr = string(b)
}
}
return fmt.Errorf("failed waiting for Envoy configuration: %v. Last config_dump:\n%s", err, configDumpStr)
}
return nil
}
func (s *sidecar) WaitForConfigOrFail(t test.Failer, accept func(*envoyAdmin.ConfigDump) (bool, error), options ...retry.Option) {
t.Helper()
if err := s.WaitForConfig(accept, options...); err != nil {
t.Fatal(err)
}
}
func (s *sidecar) Clusters() (*envoyAdmin.Clusters, error) {
msg := &envoyAdmin.Clusters{}
if err := s.adminRequest("clusters?format=json", msg); err != nil {
return nil, err
}
return msg, nil
}
func (s *sidecar) ClustersOrFail(t test.Failer) *envoyAdmin.Clusters {
t.Helper()
clusters, err := s.Clusters()
if err != nil {
t.Fatal(err)
}
return clusters
}
func (s *sidecar) Listeners() (*envoyAdmin.Listeners, error) {
msg := &envoyAdmin.Listeners{}
if err := s.adminRequest("listeners?format=json", msg); err != nil {
return nil, err
}
return msg, nil
}
func (s *sidecar) ListenersOrFail(t test.Failer) *envoyAdmin.Listeners {
t.Helper()
listeners, err := s.Listeners()
if err != nil {
t.Fatal(err)
}
return listeners
}
func (s *sidecar) Stats() (map[string]*dto.MetricFamily, error) {
return s.proxyStats()
}
func (s *sidecar) StatsOrFail(t test.Failer) map[string]*dto.MetricFamily {
t.Helper()
stats, err := s.Stats()
if err != nil {
t.Fatal(err)
}
return stats
}
func (s *sidecar) proxyStats() (map[string]*dto.MetricFamily, error) {
// Exec onto the pod and make a curl request to the admin port, writing
command := "pilot-agent request GET /stats/prometheus"
stdout, stderr, err := s.cluster.PodExec(s.podName, s.podNamespace, proxyContainerName, command)
if err != nil {
return nil, fmt.Errorf("failed exec on pod %s/%s: %v. Command: %s. Output:\n%s",
s.podNamespace, s.podName, err, command, stdout+stderr)
}
parser := expfmt.TextParser{}
mfMap, err := parser.TextToMetricFamilies(strings.NewReader(stdout))
if err != nil {
return nil, fmt.Errorf("failed parsing prometheus stats: %v", err)
}
return mfMap, nil
}
func (s *sidecar) adminRequest(path string, out proto.Message) error {
// Exec onto the pod and make a curl request to the admin port, writing
command := fmt.Sprintf("pilot-agent request GET %s", path)
stdout, stderr, err := s.cluster.PodExec(s.podName, s.podNamespace, proxyContainerName, command)
if err != nil {
return fmt.Errorf("failed exec on pod %s/%s: %v. Command: %s. Output:\n%s",
s.podNamespace, s.podName, err, command, stdout+stderr)
}
if err := protomarshal.UnmarshalAllowUnknown([]byte(stdout), out); err != nil {
return fmt.Errorf("failed parsing Envoy admin response from '/%s': %v\nResponse JSON: %s", path, err, stdout)
}
return nil
}
func (s *sidecar) Logs() (string, error) {
return s.cluster.PodLogs(context.TODO(), s.podName, s.podNamespace, proxyContainerName, false)
}
func (s *sidecar) LogsOrFail(t test.Failer) string {
t.Helper()
logs, err := s.Logs()
if err != nil {
t.Fatal(err)
}
return logs
}