blob: 051d70ab4c366090e899fad20657760ed4648a59 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package v3
import (
import (
envoy_core ""
envoy_types ""
envoy_cache ""
envoy_resource ""
import (
model ""
util_xds ""
xds_context ""
xds_sync ""
var reconcileLog = core.Log.WithName("xds-server").WithName("reconcile")
var _ xds_sync.SnapshotReconciler = &reconciler{}
type reconciler struct {
generator snapshotGenerator
cacher snapshotCacher
statsCallbacks util_xds.StatsCallbacks
func (r *reconciler) Clear(proxyId *model.ProxyId) error {
nodeId := &envoy_core.Node{Id: proxyId.String()}
return nil
func (r *reconciler) clearUndeliveredConfigStats(nodeId *envoy_core.Node) {
snap, err := r.cacher.Get(nodeId)
if err != nil {
return // already cleared
for _, res := range snap.Resources {
if res.Version != "" {
func (r *reconciler) Reconcile(ctx context.Context, xdsCtx xds_context.Context, proxy *model.Proxy) (bool, error) {
node := &envoy_core.Node{Id: proxy.Id.String()}
snapshot, err := r.generator.GenerateSnapshot(ctx, xdsCtx, proxy)
if err != nil {
return false, errors.Wrapf(err, "failed to generate a snapshot")
// To avoid assigning a new version every time, compare with
// the previous snapshot and reuse its version whenever possible,
// fallback to UUID otherwise
previous, err := r.cacher.Get(node)
if err != nil {
previous = &envoy_cache.Snapshot{}
snapshot, changed := autoVersion(previous, snapshot)
resKey := proxy.Id.ToResourceKey()
log := reconcileLog.WithValues("proxyName", resKey.Name, "mesh", resKey.Mesh)
// Validate the resources we reconciled before sending them
// to Envoy. This ensures that we have as much in-band error
// information as possible, which is especially useful for tests
// that don't actually program an Envoy instance.
if len(changed) == 0 {
log.V(1).Info("config is the same")
return false, nil
for _, resources := range snapshot.Resources {
for name, resource := range resources.Items {
if err := validateResource(resource.Resource); err != nil {
return false, errors.Wrapf(err, "invalid resource %q", name)
if err := snapshot.Consistent(); err != nil {
log.Error(err, "inconsistent snapshot", "snapshot", snapshot, "proxy", proxy)
return false, errors.Wrap(err, "inconsistent snapshot")
log.Info("config has changed", "versions", changed)
if err := r.cacher.Cache(ctx, node, snapshot); err != nil {
return false, errors.Wrap(err, "failed to store snapshot")
for _, version := range changed {
return true, nil
func validateResource(r envoy_types.Resource) error {
switch v := r.(type) {
// Newer go-control-plane versions have `ValidateAll()` method, that accumulates as many validation errors as possible.
case interface{ ValidateAll() error }:
return v.ValidateAll()
// Older go-control-plane stops validation at the first error.
case interface{ Validate() error }:
return v.Validate()
return nil
func autoVersion(old *envoy_cache.Snapshot, new *envoy_cache.Snapshot) (*envoy_cache.Snapshot, []string) {
for resourceType, resources := range old.Resources {
new.Resources[resourceType] = reuseVersion(resources, new.Resources[resourceType])
var changed []string
for resourceType, resource := range new.Resources {
if old.Resources[resourceType].Version != resource.Version {
changed = append(changed, resource.Version)
return new, changed
func reuseVersion(old, new envoy_cache.Resources) envoy_cache.Resources {
new.Version = old.Version
if !equalSnapshots(old.Items, new.Items) {
new.Version = core.NewUUID()
return new
func equalSnapshots(old, new map[string]envoy_types.ResourceWithTTL) bool {
if len(new) != len(old) {
return false
for key, newValue := range new {
if oldValue, hasOldValue := old[key]; !hasOldValue || !proto.Equal(newValue.Resource, oldValue.Resource) {
return false
return true
type snapshotGenerator interface {
GenerateSnapshot(context.Context, xds_context.Context, *model.Proxy) (*envoy_cache.Snapshot, error)
type TemplateSnapshotGenerator struct {
Resolver []string
func (s *TemplateSnapshotGenerator) GenerateSnapshot(ctx context.Context, xdsCtx xds_context.Context, proxy *model.Proxy) (*envoy_cache.Snapshot, error) {
gen := generator.ProxyTemplateGenerator{ProfileName: s.Resolver}
rs, err := gen.Generate(ctx, xdsCtx, proxy)
if err != nil {
reconcileLog.Error(err, "failed to generate a snapshot", "proxy", proxy)
return nil, err
version := "" // empty value is a sign to other components to generate the version automatically
resources := map[envoy_resource.Type][]envoy_types.Resource{}
for _, resourceType := range rs.ResourceTypes() {
resources[resourceType] = append(resources[resourceType], rs.ListOf(resourceType).Payloads()...)
return envoy_cache.NewSnapshot(version, resources)
type snapshotCacher interface {
Get(*envoy_core.Node) (*envoy_cache.Snapshot, error)
Cache(context.Context, *envoy_core.Node, *envoy_cache.Snapshot) error
type simpleSnapshotCacher struct {
hasher envoy_cache.NodeHash
store envoy_cache.SnapshotCache
func (s *simpleSnapshotCacher) Get(node *envoy_core.Node) (*envoy_cache.Snapshot, error) {
snap, err :=
if snap != nil {
snapshot, ok := snap.(*envoy_cache.Snapshot)
if !ok {
return nil, errors.New("couldn't convert snapshot from cache to envoy Snapshot")
return snapshot, nil
return nil, err
func (s *simpleSnapshotCacher) Cache(ctx context.Context, node *envoy_core.Node, snapshot *envoy_cache.Snapshot) error {
return, s.hasher.ID(node), snapshot)
func (s *simpleSnapshotCacher) Clear(node *envoy_core.Node) {