blob: 051d70ab4c366090e899fad20657760ed4648a59 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package v3
import (
"context"
)
import (
envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_types "github.com/envoyproxy/go-control-plane/pkg/cache/types"
envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
envoy_resource "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/pkg/errors"
"google.golang.org/protobuf/proto"
)
import (
"github.com/apache/dubbo-kubernetes/pkg/core"
model "github.com/apache/dubbo-kubernetes/pkg/core/xds"
util_xds "github.com/apache/dubbo-kubernetes/pkg/util/xds"
xds_context "github.com/apache/dubbo-kubernetes/pkg/xds/context"
"github.com/apache/dubbo-kubernetes/pkg/xds/generator"
xds_sync "github.com/apache/dubbo-kubernetes/pkg/xds/sync"
)
var reconcileLog = core.Log.WithName("xds-server").WithName("reconcile")
var _ xds_sync.SnapshotReconciler = &reconciler{}
type reconciler struct {
generator snapshotGenerator
cacher snapshotCacher
statsCallbacks util_xds.StatsCallbacks
}
func (r *reconciler) Clear(proxyId *model.ProxyId) error {
nodeId := &envoy_core.Node{Id: proxyId.String()}
r.clearUndeliveredConfigStats(nodeId)
r.cacher.Clear(nodeId)
return nil
}
func (r *reconciler) clearUndeliveredConfigStats(nodeId *envoy_core.Node) {
snap, err := r.cacher.Get(nodeId)
if err != nil {
return // already cleared
}
for _, res := range snap.Resources {
if res.Version != "" {
r.statsCallbacks.DiscardConfig(res.Version)
}
}
}
func (r *reconciler) Reconcile(ctx context.Context, xdsCtx xds_context.Context, proxy *model.Proxy) (bool, error) {
node := &envoy_core.Node{Id: proxy.Id.String()}
snapshot, err := r.generator.GenerateSnapshot(ctx, xdsCtx, proxy)
if err != nil {
return false, errors.Wrapf(err, "failed to generate a snapshot")
}
// To avoid assigning a new version every time, compare with
// the previous snapshot and reuse its version whenever possible,
// fallback to UUID otherwise
previous, err := r.cacher.Get(node)
if err != nil {
previous = &envoy_cache.Snapshot{}
}
snapshot, changed := autoVersion(previous, snapshot)
resKey := proxy.Id.ToResourceKey()
log := reconcileLog.WithValues("proxyName", resKey.Name, "mesh", resKey.Mesh)
// Validate the resources we reconciled before sending them
// to Envoy. This ensures that we have as much in-band error
// information as possible, which is especially useful for tests
// that don't actually program an Envoy instance.
if len(changed) == 0 {
log.V(1).Info("config is the same")
return false, nil
}
for _, resources := range snapshot.Resources {
for name, resource := range resources.Items {
if err := validateResource(resource.Resource); err != nil {
return false, errors.Wrapf(err, "invalid resource %q", name)
}
}
}
if err := snapshot.Consistent(); err != nil {
log.Error(err, "inconsistent snapshot", "snapshot", snapshot, "proxy", proxy)
return false, errors.Wrap(err, "inconsistent snapshot")
}
log.Info("config has changed", "versions", changed)
if err := r.cacher.Cache(ctx, node, snapshot); err != nil {
return false, errors.Wrap(err, "failed to store snapshot")
}
for _, version := range changed {
r.statsCallbacks.ConfigReadyForDelivery(version)
}
return true, nil
}
func validateResource(r envoy_types.Resource) error {
switch v := r.(type) {
// Newer go-control-plane versions have `ValidateAll()` method, that accumulates as many validation errors as possible.
case interface{ ValidateAll() error }:
return v.ValidateAll()
// Older go-control-plane stops validation at the first error.
case interface{ Validate() error }:
return v.Validate()
default:
return nil
}
}
func autoVersion(old *envoy_cache.Snapshot, new *envoy_cache.Snapshot) (*envoy_cache.Snapshot, []string) {
for resourceType, resources := range old.Resources {
new.Resources[resourceType] = reuseVersion(resources, new.Resources[resourceType])
}
var changed []string
for resourceType, resource := range new.Resources {
if old.Resources[resourceType].Version != resource.Version {
changed = append(changed, resource.Version)
}
}
return new, changed
}
func reuseVersion(old, new envoy_cache.Resources) envoy_cache.Resources {
new.Version = old.Version
if !equalSnapshots(old.Items, new.Items) {
new.Version = core.NewUUID()
}
return new
}
func equalSnapshots(old, new map[string]envoy_types.ResourceWithTTL) bool {
if len(new) != len(old) {
return false
}
for key, newValue := range new {
if oldValue, hasOldValue := old[key]; !hasOldValue || !proto.Equal(newValue.Resource, oldValue.Resource) {
return false
}
}
return true
}
type snapshotGenerator interface {
GenerateSnapshot(context.Context, xds_context.Context, *model.Proxy) (*envoy_cache.Snapshot, error)
}
type TemplateSnapshotGenerator struct {
Resolver []string
}
func (s *TemplateSnapshotGenerator) GenerateSnapshot(ctx context.Context, xdsCtx xds_context.Context, proxy *model.Proxy) (*envoy_cache.Snapshot, error) {
gen := generator.ProxyTemplateGenerator{ProfileName: s.Resolver}
rs, err := gen.Generate(ctx, xdsCtx, proxy)
if err != nil {
reconcileLog.Error(err, "failed to generate a snapshot", "proxy", proxy)
return nil, err
}
version := "" // empty value is a sign to other components to generate the version automatically
resources := map[envoy_resource.Type][]envoy_types.Resource{}
for _, resourceType := range rs.ResourceTypes() {
resources[resourceType] = append(resources[resourceType], rs.ListOf(resourceType).Payloads()...)
}
return envoy_cache.NewSnapshot(version, resources)
}
type snapshotCacher interface {
Get(*envoy_core.Node) (*envoy_cache.Snapshot, error)
Cache(context.Context, *envoy_core.Node, *envoy_cache.Snapshot) error
Clear(*envoy_core.Node)
}
type simpleSnapshotCacher struct {
hasher envoy_cache.NodeHash
store envoy_cache.SnapshotCache
}
func (s *simpleSnapshotCacher) Get(node *envoy_core.Node) (*envoy_cache.Snapshot, error) {
snap, err := s.store.GetSnapshot(s.hasher.ID(node))
if snap != nil {
snapshot, ok := snap.(*envoy_cache.Snapshot)
if !ok {
return nil, errors.New("couldn't convert snapshot from cache to envoy Snapshot")
}
return snapshot, nil
}
return nil, err
}
func (s *simpleSnapshotCacher) Cache(ctx context.Context, node *envoy_core.Node, snapshot *envoy_cache.Snapshot) error {
return s.store.SetSnapshot(ctx, s.hasher.ID(node), snapshot)
}
func (s *simpleSnapshotCacher) Clear(node *envoy_core.Node) {
s.store.ClearSnapshot(s.hasher.ID(node))
}