blob: a927418c8eb0aa0f6bb56390c218e0ea408aae80 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package v3
import (
"context"
"sync"
)
import (
envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
envoy_xds "github.com/envoyproxy/go-control-plane/pkg/server/v3"
)
import (
util_watchdog "github.com/apache/dubbo-kubernetes/pkg/util/watchdog"
)
type NewNodeWatchdogFunc func(ctx context.Context, node *envoy_core.Node, streamId int64) (util_watchdog.Watchdog, error)
func NewWatchdogCallbacks(newNodeWatchdog NewNodeWatchdogFunc) envoy_xds.Callbacks {
return &watchdogCallbacks{
newNodeWatchdog: newNodeWatchdog,
streams: make(map[int64]watchdogStreamState),
}
}
type watchdogCallbacks struct {
NoopCallbacks
newNodeWatchdog NewNodeWatchdogFunc
mu sync.RWMutex // protects access to the fields below
streams map[int64]watchdogStreamState
}
type watchdogStreamState struct {
context context.Context
cancel context.CancelFunc
}
var _ envoy_xds.Callbacks = &watchdogCallbacks{}
// RestStreamID is used in the non-streaming REST context
const RestStreamID = int64(-1)
func (cb *watchdogCallbacks) hasStream(streamID int64) bool {
cb.mu.RLock() // read access to the map of all ADS streams
defer cb.mu.RUnlock()
_, ok := cb.streams[streamID]
return ok
}
func (cb *watchdogCallbacks) OnFetchRequest(ctx context.Context, req *envoy_discovery.DiscoveryRequest) error {
// Open up a new "stream" state, which all REST requests use, if one doesn't already exist
if cb.hasStream(RestStreamID) {
return nil
}
if err := cb.OnStreamOpen(ctx, RestStreamID, req.TypeUrl); err != nil {
return err
}
// TODO: could also register a TTL on the REST stream to clean it up if there is no activity over a certain period,
// since it will currently never be closed once opened
return cb.OnStreamRequest(RestStreamID, req)
}
// OnStreamOpen is called once an xDS stream is open with a stream ID and the type URL (or "" for ADS).
// Returning an error will end processing and close the stream. OnStreamClosed will still be called.
func (cb *watchdogCallbacks) OnStreamOpen(ctx context.Context, streamID int64, typ string) error {
cb.mu.Lock() // write access to the map of all ADS streams
defer cb.mu.Unlock()
cb.streams[streamID] = watchdogStreamState{
context: ctx,
}
return nil
}
// OnStreamClosed is called immediately prior to closing an xDS stream with a stream ID.
func (cb *watchdogCallbacks) OnStreamClosed(streamID int64, node *envoy_core.Node) {
cb.mu.Lock() // write access to the map of all ADS streams
defer cb.mu.Unlock()
defer delete(cb.streams, streamID)
if watchdog := cb.streams[streamID]; watchdog.cancel != nil {
watchdog.cancel()
}
}
// OnStreamRequest is called once a request is received on a stream.
// Returning an error will end processing and close the stream. OnStreamClosed will still be called.
func (cb *watchdogCallbacks) OnStreamRequest(streamID int64, req *envoy_discovery.DiscoveryRequest) error {
cb.mu.RLock() // read access to the map of all ADS streams
watchdog := cb.streams[streamID]
cb.mu.RUnlock()
if watchdog.cancel != nil {
return nil
}
cb.mu.Lock() // write access to the map of all ADS streams
defer cb.mu.Unlock()
// create a stop channel even if there won't be an actual watchdog
stopCh := make(chan struct{})
watchdog.cancel = func() {
close(stopCh)
}
cb.streams[streamID] = watchdog
runnable, err := cb.newNodeWatchdog(watchdog.context, req.Node, streamID)
if err != nil {
return err
}
if runnable != nil {
// kick off watchdog for that stream
go runnable.Start(stopCh)
}
return nil
}
// OnDeltaStreamOpen is called once an xDS stream is open with a stream ID and the type URL (or "" for ADS).
// Returning an error will end processing and close the stream. OnDeltaStreamClosed will still be called.
func (cb *watchdogCallbacks) OnDeltaStreamOpen(ctx context.Context, streamID int64, typ string) error {
cb.mu.Lock() // write access to the map of all ADS streams
defer cb.mu.Unlock()
cb.streams[streamID] = watchdogStreamState{
context: ctx,
}
return nil
}
// OnDeltaStreamClosed is called immediately prior to closing an xDS stream with a stream ID.
func (cb *watchdogCallbacks) OnDeltaStreamClosed(streamID int64, node *envoy_core.Node) {
cb.mu.Lock() // write access to the map of all ADS streams
defer cb.mu.Unlock()
defer delete(cb.streams, streamID)
if watchdog := cb.streams[streamID]; watchdog.cancel != nil {
watchdog.cancel()
}
}
// OnStreamDeltaRequest is called once a request is received on a stream.
// Returning an error will end processing and close the stream. OnDeltaStreamClosed will still be called.
func (cb *watchdogCallbacks) OnStreamDeltaRequest(streamID int64, req *envoy_discovery.DeltaDiscoveryRequest) error {
cb.mu.RLock() // read access to the map of all ADS streams
watchdog := cb.streams[streamID]
cb.mu.RUnlock()
if watchdog.cancel != nil {
return nil
}
cb.mu.Lock() // write access to the map of all ADS streams
defer cb.mu.Unlock()
// create a stop channel even if there won't be an actual watchdog
stopCh := make(chan struct{})
watchdog.cancel = func() {
close(stopCh)
}
cb.streams[streamID] = watchdog
runnable, err := cb.newNodeWatchdog(watchdog.context, req.Node, streamID)
if err != nil {
return err
}
if runnable != nil {
// kick off watchdog for that stream
go runnable.Start(stopCh)
}
return nil
}