blob: dfb5c4ec2a99631dc55caf76358ab5cdef1b96df [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package istio
import (
"context"
"encoding/json"
"fmt"
"net"
"strconv"
"time"
)
import (
"github.com/apache/dubbo-go-pixiu/pkg/http/headers"
"github.com/apache/dubbo-go-pixiu/pkg/test"
"github.com/apache/dubbo-go-pixiu/pkg/test/echo/common/scheme"
"github.com/apache/dubbo-go-pixiu/pkg/test/framework/components/cluster"
"github.com/apache/dubbo-go-pixiu/pkg/test/framework/components/echo"
"github.com/apache/dubbo-go-pixiu/pkg/test/framework/components/echo/common"
"github.com/apache/dubbo-go-pixiu/pkg/test/framework/components/environment/kube"
"github.com/apache/dubbo-go-pixiu/pkg/test/framework/components/istio/ingress"
"github.com/apache/dubbo-go-pixiu/pkg/test/framework/resource"
"github.com/apache/dubbo-go-pixiu/pkg/test/scopes"
"github.com/apache/dubbo-go-pixiu/pkg/test/util/retry"
)
const (
defaultIngressIstioLabel = "ingressgateway"
defaultIngressServiceName = "istio-" + defaultIngressIstioLabel
eastWestIngressIstioLabel = "eastwestgateway"
eastWestIngressServiceName = "istio-" + eastWestIngressIstioLabel
proxyContainerName = "istio-proxy"
proxyAdminPort = 15000
discoveryPort = 15012
)
var (
getAddressTimeout = retry.Timeout(3 * time.Minute)
getAddressDelay = retry.BackoffDelay(500 * time.Millisecond)
_ ingress.Instance = &ingressImpl{}
)
type ingressConfig struct {
// ServiceName is the kubernetes Service name for the cluster
ServiceName string
// Namespace the ingress can be found in
Namespace string
// IstioLabel is the value for the "istio" label on the ingress kubernetes objects
IstioLabel string
// Cluster to be used in a multicluster environment
Cluster cluster.Cluster
}
func newIngress(ctx resource.Context, cfg ingressConfig) (i ingress.Instance) {
if cfg.ServiceName == "" {
cfg.ServiceName = defaultIngressServiceName
}
if cfg.IstioLabel == "" {
cfg.IstioLabel = defaultIngressIstioLabel
}
c := &ingressImpl{
serviceName: cfg.ServiceName,
istioLabel: cfg.IstioLabel,
namespace: cfg.Namespace,
env: ctx.Environment().(*kube.Environment),
cluster: ctx.Clusters().GetOrDefault(cfg.Cluster),
}
return c
}
type ingressImpl struct {
serviceName string
istioLabel string
namespace string
env *kube.Environment
cluster cluster.Cluster
}
// getAddressInner returns the external address for the given port. When we don't have support for LoadBalancer,
// the returned net.Addr will have the externally reachable NodePort address and port.
func (c *ingressImpl) getAddressInner(port int) (string, int, error) {
attempts := 0
addr, err := retry.UntilComplete(func() (result interface{}, completed bool, err error) {
attempts++
result, completed, err = getRemoteServiceAddress(c.env.Settings(), c.cluster, c.namespace, c.istioLabel, c.serviceName, port)
if err != nil && attempts > 1 {
// Log if we fail more than once to avoid test appearing to hang
// LB provision be slow, so timeout here needs to be long we should give context
scopes.Framework.Warnf("failed to get address for port %v: %v", port, err)
}
return
}, getAddressTimeout, getAddressDelay)
if err != nil {
return "", 0, err
}
switch v := addr.(type) {
case string:
host, portStr, err := net.SplitHostPort(v)
if err != nil {
return "", 0, err
}
mappedPort, err := strconv.Atoi(portStr)
if err != nil {
return "", 0, err
}
return host, mappedPort, nil
case net.TCPAddr:
return v.IP.String(), v.Port, nil
}
return "", 0, fmt.Errorf("failed to get address for port %v", port)
}
// AddressForPort returns the externally reachable host and port of the component for the given port.
func (c *ingressImpl) AddressForPort(port int) (string, int) {
host, port, err := c.getAddressInner(port)
if err != nil {
scopes.Framework.Error(err)
return "", 0
}
return host, port
}
func (c *ingressImpl) Cluster() cluster.Cluster {
return c.cluster
}
// HTTPAddress returns the externally reachable HTTP host and port (80) of the component.
func (c *ingressImpl) HTTPAddress() (string, int) {
return c.AddressForPort(80)
}
// TCPAddress returns the externally reachable TCP host and port (31400) of the component.
func (c *ingressImpl) TCPAddress() (string, int) {
return c.AddressForPort(31400)
}
// HTTPSAddress returns the externally reachable TCP host and port (443) of the component.
func (c *ingressImpl) HTTPSAddress() (string, int) {
return c.AddressForPort(443)
}
// DiscoveryAddress returns the externally reachable discovery address (15012) of the component.
func (c *ingressImpl) DiscoveryAddress() net.TCPAddr {
host, port := c.AddressForPort(discoveryPort)
ip := net.ParseIP(host)
if ip.String() == "<nil>" {
// TODO support hostname based discovery address
return net.TCPAddr{}
}
return net.TCPAddr{IP: ip, Port: port}
}
func (c *ingressImpl) Call(options echo.CallOptions) (echo.CallResult, error) {
return c.callEcho(options)
}
func (c *ingressImpl) CallOrFail(t test.Failer, options echo.CallOptions) echo.CallResult {
t.Helper()
resp, err := c.Call(options)
if err != nil {
t.Fatal(err)
}
return resp
}
func (c *ingressImpl) callEcho(opts echo.CallOptions) (echo.CallResult, error) {
var (
addr string
port int
)
opts = opts.DeepCopy()
if opts.Port.ServicePort == 0 {
s, err := c.schemeFor(opts)
if err != nil {
return echo.CallResult{}, err
}
opts.Scheme = s
// Default port based on protocol
switch s {
case scheme.HTTP:
addr, port = c.HTTPAddress()
case scheme.HTTPS:
addr, port = c.HTTPSAddress()
case scheme.TCP:
addr, port = c.TCPAddress()
default:
return echo.CallResult{}, fmt.Errorf("ingress: scheme %v not supported. Options: %v+", s, opts)
}
} else {
addr, port = c.AddressForPort(opts.Port.ServicePort)
}
if addr == "" || port == 0 {
scopes.Framework.Warnf("failed to get host and port for %s/%d", opts.Port.Protocol, opts.Port.ServicePort)
}
// Even if they set ServicePort, when load balancer is disabled, we may need to switch to NodePort, so replace it.
opts.Port.ServicePort = port
if len(opts.Address) == 0 {
// Default address based on port
opts.Address = addr
}
if opts.HTTP.Headers == nil {
opts.HTTP.Headers = map[string][]string{}
}
if host := opts.GetHost(); len(host) > 0 {
opts.HTTP.Headers.Set(headers.Host, host)
}
if len(c.cluster.HTTPProxy()) > 0 {
opts.HTTP.HTTPProxy = c.cluster.HTTPProxy()
}
return common.CallEcho(c, opts)
}
func (c *ingressImpl) schemeFor(opts echo.CallOptions) (scheme.Instance, error) {
if opts.Scheme == "" && opts.Port.Protocol == "" {
return "", fmt.Errorf("must provide either protocol or scheme")
}
if opts.Scheme != "" {
return opts.Scheme, nil
}
return opts.Port.Scheme()
}
func (c *ingressImpl) ProxyStats() (map[string]int, error) {
var stats map[string]int
statsJSON, err := c.adminRequest("stats?format=json")
if err != nil {
return stats, fmt.Errorf("failed to get response from admin port: %v", err)
}
return c.unmarshalStats(statsJSON)
}
func (c *ingressImpl) PodID(i int) (string, error) {
pods, err := c.env.Clusters().Default().PodsForSelector(context.TODO(), c.namespace, "istio=ingressgateway")
if err != nil {
return "", fmt.Errorf("unable to get ingressImpl gateway stats: %v", err)
}
if i < 0 || i >= len(pods.Items) {
return "", fmt.Errorf("pod index out of boundary (%d): %d", len(pods.Items), i)
}
return pods.Items[i].Name, nil
}
// adminRequest makes a call to admin port at ingress gateway proxy and returns error on request failure.
func (c *ingressImpl) adminRequest(path string) (string, error) {
pods, err := c.env.Clusters().Default().PodsForSelector(context.TODO(), c.namespace, "istio=ingressgateway")
if err != nil {
return "", fmt.Errorf("unable to get ingressImpl gateway stats: %v", err)
}
podNs, podName := pods.Items[0].Namespace, pods.Items[0].Name
// Exec onto the pod and make a curl request to the admin port
command := fmt.Sprintf("curl http://127.0.0.1:%d/%s", proxyAdminPort, path)
stdout, stderr, err := c.env.Clusters().Default().PodExec(podName, podNs, proxyContainerName, command)
return stdout + stderr, err
}
type statEntry struct {
Name string `json:"name"`
Value json.Number `json:"value"`
}
type stats struct {
StatList []statEntry `json:"stats"`
}
// unmarshalStats unmarshals Envoy stats from JSON format into a map, where stats name is
// key, and stats value is value.
func (c *ingressImpl) unmarshalStats(statsJSON string) (map[string]int, error) {
statsMap := make(map[string]int)
var statsArray stats
if err := json.Unmarshal([]byte(statsJSON), &statsArray); err != nil {
return statsMap, fmt.Errorf("unable to unmarshal stats from json: %v", err)
}
for _, v := range statsArray.StatList {
if v.Value == "" {
continue
}
tmp, _ := v.Value.Float64()
statsMap[v.Name] = int(tmp)
}
return statsMap, nil
}
func (c *ingressImpl) Namespace() string {
return c.namespace
}