blob: 24911afb4f86a72ad654ccd453fcff39c3e52381 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package callbacks
import (
import (
import (
mesh_proto ""
core_mesh ""
core_model ""
var sinkLog = core.Log.WithName("xds").WithName("sink")
type DataplaneInsightSink interface {
Start(stop <-chan struct{})
type DataplaneInsightStore interface {
// Upsert creates or updates the subscription, storing it with
// the key dataplaneID. dataplaneType gives the resource type of
// the dataplane proxy that has subscribed.
Upsert(ctx context.Context, dataplaneType core_model.ResourceType, dataplaneID core_model.ResourceKey, subscription *mesh_proto.DiscoverySubscription) error
func NewDataplaneInsightSink(
dataplaneType core_model.ResourceType,
accessor SubscriptionStatusAccessor,
newTicker func() *time.Ticker,
generationTicker func() *time.Ticker,
flushBackoff time.Duration,
store DataplaneInsightStore,
) DataplaneInsightSink {
return &dataplaneInsightSink{
flushTicker: newTicker,
generationTicker: generationTicker,
dataplaneType: dataplaneType,
accessor: accessor,
flushBackoff: flushBackoff,
store: store,
var _ DataplaneInsightSink = &dataplaneInsightSink{}
type dataplaneInsightSink struct {
flushTicker func() *time.Ticker
generationTicker func() *time.Ticker
dataplaneType core_model.ResourceType
accessor SubscriptionStatusAccessor
store DataplaneInsightStore
flushBackoff time.Duration
func (s *dataplaneInsightSink) Start(stop <-chan struct{}) {
flushTicker := s.flushTicker()
defer flushTicker.Stop()
generationTicker := s.generationTicker()
defer generationTicker.Stop()
var lastStoredState *mesh_proto.DiscoverySubscription
var generation uint32
flush := func(closing bool) {
dataplaneID, currentState := s.accessor.GetStatus()
select {
case <-generationTicker.C:
currentState.Generation = generation
if proto.Equal(currentState, lastStoredState) {
ctx := context.TODO()
if err :=, s.dataplaneType, dataplaneID, currentState); err != nil {
switch {
case closing:
// When XDS stream is closed, Dataplane Status Tracker executes OnStreamClose which closes stop channel
// The problem is that close() does not wait for this sink to do it's final work
// In the meantime Dataplane Lifecycle executes OnStreamClose which can remove Dataplane entity (and Insights due to ownership). Therefore both scenarios can happen:
// 1) upsert fail because it successfully retrieved DataplaneInsight but cannot Update because by this time, Insight is gone (ResourceConflict error)
// 2) upsert fail because it tries to create a new insight, but there is no Dataplane so ownership returns an error
// We could build a synchronous mechanism that waits for Sink to be stopped before moving on to next Callbacks, but this is potentially dangerous
// that we could block waiting for storage instead of executing next callbacks.
sinkLog.V(1).Info("failed to flush Dataplane status on stream close. It can happen when Dataplane is deleted at the same time",
"dataplaneid", dataplaneID,
"err", err)
case errors.Is(err, &store.ResourceConflictError{}):
sinkLog.V(1).Info("failed to flush DataplaneInsight because it was updated in other place. Will retry in the next tick",
"dataplaneid", dataplaneID)
sinkLog.Error(err, "failed to flush DataplaneInsight", "dataplaneid", dataplaneID)
} else {
sinkLog.V(1).Info("DataplaneInsight saved", "dataplaneid", dataplaneID, "subscription", currentState)
lastStoredState = currentState
// flush the first insight as quickly as possible so
// 1) user sees that DP is online in dubboctl/GUI (even without any XDS updates)
// 2) we can have lower deregistrationDelay, see pkg/xds/server/callbacks/dataplane_lifecycle.go#deregisterProxy
for {
select {
case <-flushTicker.C:
// On Kubernetes, because of the cache subsequent Get, Update requests can fail, because the cache is not strongly consistent.
// We handle the Resource Conflict logging on V1, but we can try to avoid the situation with backoff
case <-stop:
func NewDataplaneInsightStore(resManager manager.ResourceManager) DataplaneInsightStore {
return &dataplaneInsightStore{
resManager: resManager,
var _ DataplaneInsightStore = &dataplaneInsightStore{}
type dataplaneInsightStore struct {
resManager manager.ResourceManager
func (s *dataplaneInsightStore) Upsert(ctx context.Context, dataplaneType core_model.ResourceType, dataplaneID core_model.ResourceKey, subscription *mesh_proto.DiscoverySubscription) error {
switch dataplaneType {
case core_mesh.ZoneIngressType:
return manager.Upsert(ctx, s.resManager, dataplaneID, core_mesh.NewZoneIngressInsightResource(), func(resource core_model.Resource) error {
insight := resource.(*core_mesh.ZoneIngressInsightResource)
return insight.Spec.UpdateSubscription(subscription)
case core_mesh.DataplaneType:
return manager.Upsert(ctx, s.resManager, dataplaneID, core_mesh.NewDataplaneInsightResource(), func(resource core_model.Resource) error {
insight := resource.(*core_mesh.DataplaneInsightResource)
if err := insight.Spec.UpdateSubscription(subscription); err != nil {
return err
return nil
// Return a designated precondition error since we don't expect other dataplane types.
return store.ErrorResourceAssertion("invalid dataplane type", dataplaneType, dataplaneID.Mesh, dataplaneID.Name)