blob: d77e8f128b1205f32cb637844d381e93856eadb5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package callbacks
import (
"context"
"os"
"sync"
)
import (
"github.com/pkg/errors"
)
import (
core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
core_xds "github.com/apache/dubbo-kubernetes/pkg/core/xds"
util_xds "github.com/apache/dubbo-kubernetes/pkg/util/xds"
)
// DataplaneCallbacks are XDS callbacks that keep the context of Dubbo Dataplane.
// In the ideal world we could assume that one Dataplane has one xDS stream.
// Due to race network latencies etc. there might be a situation when one Dataplane has many xDS streams for the short period of time.
// Those callbacks helps us to deal with such situation.
//
// Keep in mind that it does not solve many xDS streams across many instances of the Control Plane.
// If there are many instances of the Control Plane and Dataplane reconnects, there might be an old stream
// in one instance of CP and a new stream in a new instance of CP.
type DataplaneCallbacks interface {
// OnProxyConnected is executed when proxy is connected after it was disconnected before.
OnProxyConnected(streamID core_xds.StreamID, dpKey core_model.ResourceKey, ctx context.Context, metadata core_xds.DataplaneMetadata) error
// OnProxyReconnected is executed when proxy is already connected, but there is another stream.
// This can happen when there is a delay with closing the old connection from the proxy to the control plane.
OnProxyReconnected(streamID core_xds.StreamID, dpKey core_model.ResourceKey, ctx context.Context, metadata core_xds.DataplaneMetadata) error
// OnProxyDisconnected is executed only when the last stream of the proxy disconnects.
OnProxyDisconnected(ctx context.Context, streamID core_xds.StreamID, dpKey core_model.ResourceKey)
}
type xdsCallbacks struct {
callbacks DataplaneCallbacks
util_xds.NoopCallbacks
sync.RWMutex
dpStreams map[core_xds.StreamID]dpStream
activeStreams map[core_model.ResourceKey]int
}
func DataplaneCallbacksToXdsCallbacks(callbacks DataplaneCallbacks) util_xds.Callbacks {
return &xdsCallbacks{
callbacks: callbacks,
dpStreams: map[core_xds.StreamID]dpStream{},
activeStreams: map[core_model.ResourceKey]int{},
}
}
type dpStream struct {
dp *core_model.ResourceKey
ctx context.Context
}
var _ util_xds.Callbacks = &xdsCallbacks{}
func (d *xdsCallbacks) OnStreamClosed(streamID core_xds.StreamID) {
var lastStreamDpKey *core_model.ResourceKey
d.Lock()
dpStream := d.dpStreams[streamID]
if dpKey := dpStream.dp; dpKey != nil {
d.activeStreams[*dpKey]--
if d.activeStreams[*dpKey] == 0 {
lastStreamDpKey = dpKey
delete(d.activeStreams, *dpKey)
}
}
delete(d.dpStreams, streamID)
d.Unlock()
if lastStreamDpKey != nil {
// execute callback after lock is freed, so heavy callback implementation won't block every callback for every DPP.
d.callbacks.OnProxyDisconnected(dpStream.ctx, streamID, *lastStreamDpKey)
}
}
func (d *xdsCallbacks) OnStreamRequest(streamID core_xds.StreamID, request util_xds.DiscoveryRequest) error {
if request.NodeId() == "" {
// from https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#ack-nack-and-versioning:
// Only the first request on a stream is guaranteed to carry the node identifier.
// The subsequent discovery requests on the same stream may carry an empty node identifier.
// This holds true regardless of the acceptance of the discovery responses on the same stream.
// The node identifier should always be identical if present more than once on the stream.
// It is sufficient to only check the first message for the node identifier as a result.
return nil
}
d.RLock()
alreadyProcessed := d.dpStreams[streamID].dp != nil
d.RUnlock()
if alreadyProcessed {
return nil
}
proxyId, err := core_xds.ParseProxyIdFromString(request.NodeId())
if err != nil {
return errors.Wrap(err, "invalid1 node ID")
}
dpKey := proxyId.ToResourceKey()
metadata := core_xds.DataplaneMetadataFromXdsMetadata(request.Metadata(), os.TempDir(), dpKey)
if metadata == nil {
return errors.New("metadata in xDS Node cannot be nil")
}
d.Lock()
// in case client will open 2 concurrent request for the same streamID then
// we don't to increment the counter twice, so checking once again that stream
// wasn't processed
alreadyProcessed = d.dpStreams[streamID].dp != nil
if alreadyProcessed {
return nil
}
dpStream := d.dpStreams[streamID]
dpStream.dp = &dpKey
d.dpStreams[streamID] = dpStream
activeStreams := d.activeStreams[dpKey]
d.activeStreams[dpKey]++
d.Unlock()
if activeStreams == 0 {
if err := d.callbacks.OnProxyConnected(streamID, dpKey, dpStream.ctx, *metadata); err != nil {
return err
}
} else {
if err := d.callbacks.OnProxyReconnected(streamID, dpKey, dpStream.ctx, *metadata); err != nil {
return err
}
}
return nil
}
func (d *xdsCallbacks) OnStreamOpen(ctx context.Context, streamID core_xds.StreamID, _ string) error {
d.Lock()
defer d.Unlock()
dps := dpStream{
ctx: ctx,
}
d.dpStreams[streamID] = dps
return nil
}
// NoopDataplaneCallbacks are empty callbacks that helps to implement DataplaneCallbacks without need to implement every function.
type NoopDataplaneCallbacks struct{}
func (n *NoopDataplaneCallbacks) OnProxyReconnected(core_xds.StreamID, core_model.ResourceKey, context.Context, core_xds.DataplaneMetadata) error {
return nil
}
func (n *NoopDataplaneCallbacks) OnProxyConnected(core_xds.StreamID, core_model.ResourceKey, context.Context, core_xds.DataplaneMetadata) error {
return nil
}
func (n *NoopDataplaneCallbacks) OnProxyDisconnected(context.Context, core_xds.StreamID, core_model.ResourceKey) {
}
var _ DataplaneCallbacks = &NoopDataplaneCallbacks{}