blob: 79c016a11ea9f8098b821df546dc5f41ccb5f95c [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package deployment
import (
"context"
"fmt"
"strings"
"sync"
"time"
)
import (
"github.com/google/go-cmp/cmp"
"github.com/hashicorp/go-multierror"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
import (
"github.com/apache/dubbo-go-pixiu/pkg/kube/inject"
"github.com/apache/dubbo-go-pixiu/pkg/test"
"github.com/apache/dubbo-go-pixiu/pkg/test/framework/components/cluster"
"github.com/apache/dubbo-go-pixiu/pkg/test/framework/components/echo"
"github.com/apache/dubbo-go-pixiu/pkg/test/framework/components/echo/kube"
_ "github.com/apache/dubbo-go-pixiu/pkg/test/framework/components/echo/staticvm"
"github.com/apache/dubbo-go-pixiu/pkg/test/framework/components/istio"
"github.com/apache/dubbo-go-pixiu/pkg/test/framework/components/namespace"
"github.com/apache/dubbo-go-pixiu/pkg/test/framework/resource"
"github.com/apache/dubbo-go-pixiu/pkg/test/scopes"
"github.com/apache/dubbo-go-pixiu/pkg/util/sets"
)
// Builder for a group of collaborating Echo Instances. Once built, all Instances in the
// group:
//
// 1. Are ready to receive traffic, and
// 2. Can call every other Instance in the group (i.e. have received Envoy config
// from Pilot).
//
// If a test needs to verify that one Instance is NOT reachable from another, there are
// a couple of options:
//
// 1. Build a group while all Instances ARE reachable. Then apply a policy
// disallowing the communication.
// 2. Build the source and destination Instances in separate groups and then
// call `source.WaitUntilCallable(destination)`.
type Builder interface {
// With adds a new Echo configuration to the Builder. Once built, the instance
// pointer will be updated to point at the new Instance.
With(i *echo.Instance, cfg echo.Config) Builder
// WithConfig mimics the behavior of With, but does not allow passing a reference
// and returns an echoboot builder rather than a generic echo builder.
// TODO rename this to With, and the old method to WithInstance
WithConfig(cfg echo.Config) Builder
// WithClusters will cause subsequent With or WithConfig calls to be applied to the given clusters.
WithClusters(...cluster.Cluster) Builder
// Build and initialize all Echo Instances. Upon returning, the Instance pointers
// are assigned and all Instances are ready to communicate with each other.
Build() (echo.Instances, error)
BuildOrFail(t test.Failer) echo.Instances
}
var _ Builder = builder{}
// New builder for echo deployments.
func New(ctx resource.Context, clusters ...cluster.Cluster) Builder {
// use all workload clusters unless otherwise specified
if len(clusters) == 0 {
clusters = ctx.Clusters()
}
b := builder{
ctx: ctx,
configs: map[cluster.Kind][]echo.Config{},
refs: map[cluster.Kind][]*echo.Instance{},
namespaces: map[string]namespace.Instance{},
}
templates, err := b.injectionTemplates()
if err != nil {
// deal with this when we call Build() to avoid making the New signature unwieldy
b.errs = multierror.Append(b.errs, fmt.Errorf("failed finding injection templates on clusters %v", err))
}
b.templates = templates
return b.WithClusters(clusters...)
}
type builder struct {
ctx resource.Context
// clusters contains the current set of clusters that subsequent With calls will be applied to,
// if the Config passed to With does not explicitly choose a cluster.
clusters cluster.Clusters
// configs contains configurations to be built, expanded per-cluster and grouped by cluster Kind.
configs map[cluster.Kind][]echo.Config
// refs contains the references to assign built Instances to.
// The length of each refs slice should match the length of the corresponding cluster slice.
// Only the first per-cluster entry for a given config should have a non-nil ref.
refs map[cluster.Kind][]*echo.Instance
// namespaces caches namespaces by their prefix; used for converting Static namespace from configs into actual
// namesapces
namespaces map[string]namespace.Instance
// the set of injection templates for each cluster
templates map[string]sets.Set
// errs contains a multierror for failed validation during With calls
errs error
}
func (b builder) WithConfig(cfg echo.Config) Builder {
return b.With(nil, cfg).(builder)
}
// With adds a new Echo configuration to the Builder. When a cluster is provided in the Config, it will only be applied
// to that cluster, otherwise the Config is applied to all WithClusters. Once built, if being built for a single cluster,
// the instance pointer will be updated to point at the new Instance.
func (b builder) With(i *echo.Instance, cfg echo.Config) Builder {
if b.ctx.Settings().SkipWorkloadClassesAsSet().Contains(cfg.WorkloadClass()) {
return b
}
cfg = cfg.DeepCopy()
if err := cfg.FillDefaults(b.ctx); err != nil {
b.errs = multierror.Append(b.errs, err)
return b
}
// cache the namespace, so manually added echo.Configs can be a part of it
b.namespaces[cfg.Namespace.Prefix()] = cfg.Namespace
targetClusters := b.clusters
if cfg.Cluster != nil {
targetClusters = cluster.Clusters{cfg.Cluster}
}
// If we didn't deploy VMs, but we don't care about VMs, we can ignore this.
shouldSkip := b.ctx.Settings().Skip(echo.VM) && cfg.IsVM()
deployedTo := 0
for idx, c := range targetClusters {
ec, ok := c.(echo.Cluster)
if !ok {
b.errs = multierror.Append(b.errs, fmt.Errorf("attempted to deploy to %s but it does not implement echo.Cluster", c.Name()))
continue
}
perClusterConfig, ok := ec.CanDeploy(cfg)
if !ok {
continue
}
if !b.validateTemplates(perClusterConfig, c) {
if c.Kind() == cluster.Kubernetes {
scopes.Framework.Warnf("%s does not contain injection templates for %s; skipping deployment", c.Name(), perClusterConfig.ClusterLocalFQDN())
}
// Don't error out when injection template missing.
shouldSkip = true
continue
}
var ref *echo.Instance
if idx == 0 {
// ref only applies to the first cluster deployed to
// refs shouldn't be used when deploying to multiple targetClusters
// TODO: should we just panic if a ref is passed in a multi-cluster context?
ref = i
}
perClusterConfig = perClusterConfig.DeepCopy()
k := ec.Kind()
perClusterConfig.Cluster = ec
b.configs[k] = append(b.configs[k], perClusterConfig)
b.refs[k] = append(b.refs[k], ref)
deployedTo++
}
if deployedTo == 0 && !shouldSkip {
b.errs = multierror.Append(b.errs, fmt.Errorf("no clusters were eligible for app %s", cfg.Service))
}
return b
}
// WithClusters will cause subsequent With calls to be applied to the given clusters.
func (b builder) WithClusters(clusters ...cluster.Cluster) Builder {
next := b
next.clusters = clusters
return next
}
func (b builder) Build() (out echo.Instances, err error) {
return build(b)
}
// injectionTemplates lists the set of templates for each Kube cluster
func (b builder) injectionTemplates() (map[string]sets.Set, error) {
ns := "dubbo-system"
i, err := istio.Get(b.ctx)
if err != nil {
scopes.Framework.Infof("defaulting to dubbo-system namespace for injection template discovery: %v", err)
} else {
ns = i.Settings().SystemNamespace
}
out := map[string]sets.Set{}
for _, c := range b.ctx.Clusters().Kube() {
out[c.Name()] = sets.New()
// TODO find a place to read revision(s) and avoid listing
cms, err := c.CoreV1().ConfigMaps(ns).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, err
}
// take the intersection of the templates available from each revision in this cluster
intersection := sets.New()
for _, item := range cms.Items {
if !strings.HasPrefix(item.Name, "istio-sidecar-injector") {
continue
}
data, err := inject.UnmarshalConfig([]byte(item.Data["config"]))
if err != nil {
return nil, fmt.Errorf("failed parsing injection cm in %s: %v", c.Name(), err)
}
if data.RawTemplates != nil {
t := sets.New()
for name := range data.RawTemplates {
t.Insert(name)
}
// either intersection has not been set or we intersect these templates
// with the current set.
if intersection.IsEmpty() {
intersection = t
} else {
intersection = intersection.Intersection(t)
}
}
}
for name := range intersection {
out[c.Name()].Insert(name)
}
}
return out, nil
}
// build inner allows assigning to b (assignment to receiver would be ineffective)
func build(b builder) (out echo.Instances, err error) {
start := time.Now()
scopes.Framework.Info("=== BEGIN: Deploy echo instances ===")
defer func() {
if err != nil {
scopes.Framework.Error("=== FAILED: Deploy echo instances ===")
scopes.Framework.Error(err)
} else {
scopes.Framework.Infof("=== SUCCEEDED: Deploy echo instances in %v ===", time.Since(start))
}
}()
// load additional configs
for _, cfg := range *additionalConfigs {
// swap the namespace.Static for a namespace.kube
b, cfg.Namespace = b.getOrCreateNamespace(cfg.Namespace.Prefix())
// register the extra config
b = b.WithConfig(cfg).(builder)
}
// bail early if there were issues during the configuration stage
if b.errs != nil {
return nil, b.errs
}
if err = b.deployServices(); err != nil {
return
}
if out, err = b.deployInstances(); err != nil {
return
}
return
}
func (b builder) getOrCreateNamespace(prefix string) (builder, namespace.Instance) {
ns, ok := b.namespaces[prefix]
if ok {
return b, ns
}
ns, err := namespace.New(b.ctx, namespace.Config{Prefix: prefix, Inject: true})
if err != nil {
b.errs = multierror.Append(b.errs, err)
}
b.namespaces[prefix] = ns
return b, ns
}
// deployServices deploys the kubernetes Service to all clusters. Multicluster meshes should have "sameness"
// per cluster. This avoids concurrent writes later.
func (b builder) deployServices() (err error) {
services := make(map[string]string)
for _, cfgs := range b.configs {
for _, cfg := range cfgs {
svc, err := kube.GenerateService(cfg)
if err != nil {
return err
}
if existing, ok := services[cfg.ClusterLocalFQDN()]; ok {
// we've already run the generation for another echo instance's config, make sure things are the same
if existing != svc {
return fmt.Errorf("inconsistency in %s Service definition:\n%s", cfg.Service, cmp.Diff(existing, svc))
}
}
services[cfg.ClusterLocalFQDN()] = svc
}
}
// Deploy the services to all clusters.
cfg := b.ctx.ConfigKube().New()
for svcNs, svcYaml := range services {
ns := strings.Split(svcNs, ".")[1]
cfg.YAML(ns, svcYaml)
}
return cfg.Apply(resource.NoCleanup)
}
func (b builder) deployInstances() (instances echo.Instances, err error) {
m := sync.Mutex{}
out := echo.Instances{}
g := multierror.Group{}
// run the builder func for each kind of config in parallel
for kind, configs := range b.configs {
kind := kind
configs := configs
g.Go(func() error {
buildFunc, err := echo.GetBuilder(kind)
if err != nil {
return err
}
instances, err := buildFunc(b.ctx, configs)
if err != nil {
return err
}
// link reference pointers
if err := assignRefs(b.refs[kind], instances); err != nil {
return err
}
// safely merge instances from all kinds of cluster into one list
m.Lock()
defer m.Unlock()
out = append(out, instances...)
return nil
})
}
if err := g.Wait().ErrorOrNil(); err != nil {
return nil, err
}
return out, nil
}
func assignRefs(refs []*echo.Instance, instances echo.Instances) error {
if len(refs) != len(instances) {
return fmt.Errorf("cannot set %d references, only %d instances were built", len(refs), len(instances))
}
for i, ref := range refs {
if ref != nil {
*ref = instances[i]
}
}
return nil
}
func (b builder) BuildOrFail(t test.Failer) echo.Instances {
t.Helper()
out, err := b.Build()
if err != nil {
t.Fatal(err)
}
return out
}
// validateTemplates returns true if the templates specified by inject.istio.io/templates on the config exist on c
func (b builder) validateTemplates(config echo.Config, c cluster.Cluster) bool {
expected := sets.New()
for _, subset := range config.Subsets {
expected.InsertAll(parseList(subset.Annotations.Get(echo.SidecarInjectTemplates))...)
}
if b.templates == nil || b.templates[c.Name()] == nil {
return len(expected) == 0
}
return b.templates[c.Name()].SupersetOf(expected)
}
func parseList(s string) []string {
if len(strings.TrimSpace(s)) == 0 {
return nil
}
items := strings.Split(s, ",")
for i := range items {
items[i] = strings.TrimSpace(items[i])
}
return items
}