| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package callbacks |
| |
| import ( |
| "context" |
| "os" |
| "sync" |
| ) |
| |
| import ( |
| "github.com/pkg/errors" |
| ) |
| |
| import ( |
| core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model" |
| core_xds "github.com/apache/dubbo-kubernetes/pkg/core/xds" |
| util_xds "github.com/apache/dubbo-kubernetes/pkg/util/xds" |
| ) |
| |
| // DataplaneCallbacks are XDS callbacks that keep the context of Dubbo Dataplane. |
| // In the ideal world we could assume that one Dataplane has one xDS stream. |
| // Due to race network latencies etc. there might be a situation when one Dataplane has many xDS streams for the short period of time. |
| // Those callbacks helps us to deal with such situation. |
| // |
| // Keep in mind that it does not solve many xDS streams across many instances of the Control Plane. |
| // If there are many instances of the Control Plane and Dataplane reconnects, there might be an old stream |
| // in one instance of CP and a new stream in a new instance of CP. |
| type DataplaneCallbacks interface { |
| // OnProxyConnected is executed when proxy is connected after it was disconnected before. |
| OnProxyConnected(streamID core_xds.StreamID, dpKey core_model.ResourceKey, ctx context.Context, metadata core_xds.DataplaneMetadata) error |
| // OnProxyReconnected is executed when proxy is already connected, but there is another stream. |
| // This can happen when there is a delay with closing the old connection from the proxy to the control plane. |
| OnProxyReconnected(streamID core_xds.StreamID, dpKey core_model.ResourceKey, ctx context.Context, metadata core_xds.DataplaneMetadata) error |
| // OnProxyDisconnected is executed only when the last stream of the proxy disconnects. |
| OnProxyDisconnected(ctx context.Context, streamID core_xds.StreamID, dpKey core_model.ResourceKey) |
| } |
| |
| type xdsCallbacks struct { |
| callbacks DataplaneCallbacks |
| util_xds.NoopCallbacks |
| |
| sync.RWMutex |
| dpStreams map[core_xds.StreamID]dpStream |
| activeStreams map[core_model.ResourceKey]int |
| } |
| |
| func DataplaneCallbacksToXdsCallbacks(callbacks DataplaneCallbacks) util_xds.Callbacks { |
| return &xdsCallbacks{ |
| callbacks: callbacks, |
| dpStreams: map[core_xds.StreamID]dpStream{}, |
| activeStreams: map[core_model.ResourceKey]int{}, |
| } |
| } |
| |
| type dpStream struct { |
| dp *core_model.ResourceKey |
| ctx context.Context |
| } |
| |
| var _ util_xds.Callbacks = &xdsCallbacks{} |
| |
| func (d *xdsCallbacks) OnStreamClosed(streamID core_xds.StreamID) { |
| var lastStreamDpKey *core_model.ResourceKey |
| d.Lock() |
| dpStream := d.dpStreams[streamID] |
| if dpKey := dpStream.dp; dpKey != nil { |
| d.activeStreams[*dpKey]-- |
| if d.activeStreams[*dpKey] == 0 { |
| lastStreamDpKey = dpKey |
| delete(d.activeStreams, *dpKey) |
| } |
| } |
| delete(d.dpStreams, streamID) |
| d.Unlock() |
| if lastStreamDpKey != nil { |
| // execute callback after lock is freed, so heavy callback implementation won't block every callback for every DPP. |
| d.callbacks.OnProxyDisconnected(dpStream.ctx, streamID, *lastStreamDpKey) |
| } |
| } |
| |
| func (d *xdsCallbacks) OnStreamRequest(streamID core_xds.StreamID, request util_xds.DiscoveryRequest) error { |
| if request.NodeId() == "" { |
| // from https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#ack-nack-and-versioning: |
| // Only the first request on a stream is guaranteed to carry the node identifier. |
| // The subsequent discovery requests on the same stream may carry an empty node identifier. |
| // This holds true regardless of the acceptance of the discovery responses on the same stream. |
| // The node identifier should always be identical if present more than once on the stream. |
| // It is sufficient to only check the first message for the node identifier as a result. |
| return nil |
| } |
| |
| d.RLock() |
| alreadyProcessed := d.dpStreams[streamID].dp != nil |
| d.RUnlock() |
| if alreadyProcessed { |
| return nil |
| } |
| |
| proxyId, err := core_xds.ParseProxyIdFromString(request.NodeId()) |
| if err != nil { |
| return errors.Wrap(err, "invalid1 node ID") |
| } |
| dpKey := proxyId.ToResourceKey() |
| metadata := core_xds.DataplaneMetadataFromXdsMetadata(request.Metadata(), os.TempDir(), dpKey) |
| if metadata == nil { |
| return errors.New("metadata in xDS Node cannot be nil") |
| } |
| |
| d.Lock() |
| // in case client will open 2 concurrent request for the same streamID then |
| // we don't to increment the counter twice, so checking once again that stream |
| // wasn't processed |
| alreadyProcessed = d.dpStreams[streamID].dp != nil |
| if alreadyProcessed { |
| return nil |
| } |
| |
| dpStream := d.dpStreams[streamID] |
| dpStream.dp = &dpKey |
| d.dpStreams[streamID] = dpStream |
| |
| activeStreams := d.activeStreams[dpKey] |
| d.activeStreams[dpKey]++ |
| d.Unlock() |
| |
| if activeStreams == 0 { |
| if err := d.callbacks.OnProxyConnected(streamID, dpKey, dpStream.ctx, *metadata); err != nil { |
| return err |
| } |
| } else { |
| if err := d.callbacks.OnProxyReconnected(streamID, dpKey, dpStream.ctx, *metadata); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| func (d *xdsCallbacks) OnStreamOpen(ctx context.Context, streamID core_xds.StreamID, _ string) error { |
| d.Lock() |
| defer d.Unlock() |
| dps := dpStream{ |
| ctx: ctx, |
| } |
| d.dpStreams[streamID] = dps |
| return nil |
| } |
| |
| // NoopDataplaneCallbacks are empty callbacks that helps to implement DataplaneCallbacks without need to implement every function. |
| type NoopDataplaneCallbacks struct{} |
| |
| func (n *NoopDataplaneCallbacks) OnProxyReconnected(core_xds.StreamID, core_model.ResourceKey, context.Context, core_xds.DataplaneMetadata) error { |
| return nil |
| } |
| |
| func (n *NoopDataplaneCallbacks) OnProxyConnected(core_xds.StreamID, core_model.ResourceKey, context.Context, core_xds.DataplaneMetadata) error { |
| return nil |
| } |
| |
| func (n *NoopDataplaneCallbacks) OnProxyDisconnected(context.Context, core_xds.StreamID, core_model.ResourceKey) { |
| } |
| |
| var _ DataplaneCallbacks = &NoopDataplaneCallbacks{} |