blob: 72911b6861e6c2fc835b4b8f7a6991dbcfff50d7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package callbacks
import (
"context"
"os"
"strings"
"sync"
)
import (
"google.golang.org/protobuf/proto"
)
import (
mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
"github.com/apache/dubbo-kubernetes/pkg/core"
core_mesh "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
core_runtime "github.com/apache/dubbo-kubernetes/pkg/core/runtime"
core_xds "github.com/apache/dubbo-kubernetes/pkg/core/xds"
util_proto "github.com/apache/dubbo-kubernetes/pkg/util/proto"
util_xds "github.com/apache/dubbo-kubernetes/pkg/util/xds"
)
var statusTrackerLog = core.Log.WithName("xds").WithName("status-tracker")
type DataplaneStatusTracker interface {
util_xds.Callbacks
GetStatusAccessor(streamID int64) (SubscriptionStatusAccessor, bool)
}
type SubscriptionStatusAccessor interface {
GetStatus() (core_model.ResourceKey, *mesh_proto.DiscoverySubscription)
}
type DataplaneInsightSinkFactoryFunc = func(core_model.ResourceType, SubscriptionStatusAccessor) DataplaneInsightSink
func NewDataplaneStatusTracker(
runtimeInfo core_runtime.RuntimeInfo,
createStatusSink DataplaneInsightSinkFactoryFunc,
) DataplaneStatusTracker {
return &dataplaneStatusTracker{
runtimeInfo: runtimeInfo,
createStatusSink: createStatusSink,
streams: make(map[int64]*streamState),
}
}
var _ DataplaneStatusTracker = &dataplaneStatusTracker{}
type dataplaneStatusTracker struct {
util_xds.NoopCallbacks
runtimeInfo core_runtime.RuntimeInfo
createStatusSink DataplaneInsightSinkFactoryFunc
mu sync.RWMutex // protects access to the fields below
streams map[int64]*streamState
}
type streamState struct {
stop chan struct{} // is used for stopping a goroutine that flushes Dataplane status periodically
mu sync.RWMutex // protects access to the fields below
dataplaneId core_model.ResourceKey
subscription *mesh_proto.DiscoverySubscription
}
// OnStreamOpen is called once an xDS stream is open with a stream ID and the type URL (or "" for ADS).
// Returning an error will end processing and close the stream. OnStreamClosed will still be called.
func (c *dataplaneStatusTracker) OnStreamOpen(ctx context.Context, streamID int64, typ string) error {
c.mu.Lock() // write access to the map of all ADS streams
defer c.mu.Unlock()
// initialize subscription
now := core.Now()
subscription := &mesh_proto.DiscoverySubscription{
Id: core.NewUUID(),
ControlPlaneInstanceId: c.runtimeInfo.GetInstanceId(),
ConnectTime: util_proto.MustTimestampProto(now),
Status: mesh_proto.NewSubscriptionStatus(now),
Version: mesh_proto.NewVersion(),
}
// initialize state per ADS stream
state := &streamState{
stop: make(chan struct{}),
subscription: subscription,
}
// save
c.streams[streamID] = state
statusTrackerLog.V(1).Info("proxy connecting", "streamID", streamID, "type", typ, "subscriptionID", subscription.Id)
return nil
}
// OnStreamClosed is called immediately prior to closing an xDS stream with a stream ID.
func (c *dataplaneStatusTracker) OnStreamClosed(streamID int64) {
c.mu.Lock() // write access to the map of all ADS streams
defer c.mu.Unlock()
state := c.streams[streamID]
if state == nil {
statusTrackerLog.Info("[WARNING] proxy disconnected but no state in the status_tracker", "streamID", streamID)
return
}
delete(c.streams, streamID)
// finilize subscription
state.mu.Lock() // write access to the per Dataplane info
subscription := state.subscription
subscription.DisconnectTime = util_proto.MustTimestampProto(core.Now())
state.mu.Unlock()
// trigger final flush
state.Close()
log := statusTrackerLog.WithValues(
"streamID", streamID,
"proxyName", state.dataplaneId.Name,
"mesh", state.dataplaneId.Mesh,
"subscriptionID", state.subscription.Id,
)
if statusTrackerLog.V(1).Enabled() {
log = log.WithValues("subscription", subscription)
}
log.Info("proxy disconnected")
}
// OnStreamRequest is called once a request is received on a stream.
// Returning an error will end processing and close the stream. OnStreamClosed will still be called.
func (c *dataplaneStatusTracker) OnStreamRequest(streamID int64, req util_xds.DiscoveryRequest) error {
c.mu.RLock() // read access to the map of all ADS streams
defer c.mu.RUnlock()
state := c.streams[streamID]
state.mu.Lock() // write access to the per Dataplane info
defer state.mu.Unlock()
if state.dataplaneId == (core_model.ResourceKey{}) {
// Infer the Dataplane ID.
if proxyId, err := core_xds.ParseProxyIdFromString(req.NodeId()); err == nil {
state.dataplaneId = proxyId.ToResourceKey()
var dpType core_model.ResourceType
md := core_xds.DataplaneMetadataFromXdsMetadata(req.Metadata(), os.TempDir(), state.dataplaneId)
// If the dataplane was started with a resource YAML, then it
// will be serialized in the node metadata and we would know
// the underlying type directly. Since that is optional, we
// can't depend on it here, so we map from the proxy type,
// which is guaranteed.
switch md.GetProxyType() {
case mesh_proto.IngressProxyType:
dpType = core_mesh.ZoneIngressType
case mesh_proto.DataplaneProxyType:
dpType = core_mesh.DataplaneType
}
log := statusTrackerLog.WithValues(
"proxyName", state.dataplaneId.Name,
"mesh", state.dataplaneId.Mesh,
"streamID", streamID,
"type", md.GetProxyType(),
"subscriptionID", state.subscription.Id,
)
if statusTrackerLog.V(1).Enabled() {
log = log.WithValues("node", req.Node())
}
log.Info("proxy connected")
statusTrackerLog.Error(err, "failed to extract version out of the Envoy metadata", "streamid", streamID, "metadata", req.Metadata())
// Kick off the async Dataplane status flusher.
go c.createStatusSink(dpType, state).Start(state.stop)
} else {
statusTrackerLog.Error(err, "failed to parse Dataplane Id out of DiscoveryRequest", "streamid", streamID, "req", req)
}
}
subscription := state.subscription
log := statusTrackerLog.WithValues(
"proxyName", state.dataplaneId.Name,
"mesh", state.dataplaneId.Mesh,
"streamID", streamID,
"type", shortEnvoyType(req.GetTypeUrl()),
"resourceVersion", req.VersionInfo(),
)
if statusTrackerLog.V(1).Enabled() {
log = log.WithValues(
"resourceNames", req.GetResourceNames(),
"subscriptionID", subscription.Id,
"nonce", req.GetResponseNonce(),
)
}
// update Dataplane status
if req.GetResponseNonce() != "" {
subscription.Status.LastUpdateTime = util_proto.MustTimestampProto(core.Now())
if req.HasErrors() {
log.Info("config rejected")
subscription.Status.Total.ResponsesRejected++
subscription.Status.StatsOf(req.GetTypeUrl()).ResponsesRejected++
} else {
log.V(1).Info("config accepted")
subscription.Status.Total.ResponsesAcknowledged++
subscription.Status.StatsOf(req.GetTypeUrl()).ResponsesAcknowledged++
}
} else {
if !statusTrackerLog.V(1).Enabled() { // it was already added, no need to add it twice
log = log.WithValues("resourceNames", req.GetResourceNames())
}
log.Info("config requested")
}
return nil
}
// OnStreamResponse is called immediately prior to sending a response on a stream.
func (c *dataplaneStatusTracker) OnStreamResponse(streamID int64, req util_xds.DiscoveryRequest, resp util_xds.DiscoveryResponse) {
c.mu.RLock() // read access to the map of all ADS streams
defer c.mu.RUnlock()
state := c.streams[streamID]
state.mu.Lock() // write access to the per Dataplane info
defer state.mu.Unlock()
// update Dataplane status
subscription := state.subscription
subscription.Status.LastUpdateTime = util_proto.MustTimestampProto(core.Now())
subscription.Status.Total.ResponsesSent++
subscription.Status.StatsOf(resp.GetTypeUrl()).ResponsesSent++
log := statusTrackerLog.WithValues(
"proxyName", state.dataplaneId.Name,
"mesh", state.dataplaneId.Mesh,
"streamID", streamID,
"type", shortEnvoyType(req.GetTypeUrl()),
"resourceVersion", resp.VersionInfo(),
"requestedResourceNames", req.GetResourceNames(),
"resourceCount", len(resp.GetResources()),
)
if statusTrackerLog.V(1).Enabled() {
log = log.WithValues(
"subscriptionID", subscription.Id,
"nonce", resp.GetNonce(),
)
}
log.V(1).Info("config sent")
}
// To keep logs short, we want to log "Listeners" instead of full qualified Envoy type url name
func shortEnvoyType(typeURL string) string {
segments := strings.Split(typeURL, ".")
if len(segments) <= 1 {
return typeURL
}
return segments[len(segments)-1]
}
func (c *dataplaneStatusTracker) GetStatusAccessor(streamID int64) (SubscriptionStatusAccessor, bool) {
state, ok := c.streams[streamID]
return state, ok
}
var _ SubscriptionStatusAccessor = &streamState{}
func (s *streamState) GetStatus() (core_model.ResourceKey, *mesh_proto.DiscoverySubscription) {
s.mu.RLock() // read access to the per Dataplane info
defer s.mu.RUnlock()
return s.dataplaneId, proto.Clone(s.subscription).(*mesh_proto.DiscoverySubscription)
}
func (s *streamState) Close() {
close(s.stop)
}