blob: 75f5d1489f4a162f9c0d147e19f5ce7bc3b31e7c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
*
* Copyright 2021 gRPC authors.
*
*/
package pubsub
import (
"fmt"
"sync"
"time"
)
import (
"dubbo.apache.org/dubbo-go/v3/xds/client/resource"
"dubbo.apache.org/dubbo-go/v3/xds/utils/pretty"
)
type watchInfoState int
const (
watchInfoStateStarted watchInfoState = iota
watchInfoStateRespReceived
watchInfoStateTimeout
watchInfoStateCanceled
)
// watchInfo holds all the information from a watch() call.
type watchInfo struct {
c *Pubsub
rType resource.ResourceType
target string
ldsCallback func(resource.ListenerUpdate, error)
rdsCallback func(resource.RouteConfigUpdate, error)
cdsCallback func(resource.ClusterUpdate, error)
edsCallback func(resource.EndpointsUpdate, error)
expiryTimer *time.Timer
// mu protects state, and c.scheduleCallback().
// - No callback should be scheduled after watchInfo is canceled.
// - No timeout error should be scheduled after watchInfo is resp received.
mu sync.Mutex
state watchInfoState
}
func (wi *watchInfo) newUpdate(update interface{}) {
wi.mu.Lock()
defer wi.mu.Unlock()
if wi.state == watchInfoStateCanceled {
return
}
wi.state = watchInfoStateRespReceived
wi.expiryTimer.Stop()
wi.c.scheduleCallback(wi, update, nil)
}
func (wi *watchInfo) newError(err error) {
wi.mu.Lock()
defer wi.mu.Unlock()
if wi.state == watchInfoStateCanceled {
return
}
wi.state = watchInfoStateRespReceived
wi.expiryTimer.Stop()
wi.sendErrorLocked(err)
}
func (wi *watchInfo) resourceNotFound() {
wi.mu.Lock()
defer wi.mu.Unlock()
if wi.state == watchInfoStateCanceled {
return
}
wi.state = watchInfoStateRespReceived
wi.expiryTimer.Stop()
wi.sendErrorLocked(resource.NewErrorf(resource.ErrorTypeResourceNotFound, "xds: %v target %s not found in received response", wi.rType, wi.target))
}
func (wi *watchInfo) timeout() {
wi.mu.Lock()
defer wi.mu.Unlock()
if wi.state == watchInfoStateCanceled || wi.state == watchInfoStateRespReceived {
return
}
wi.state = watchInfoStateTimeout
wi.sendErrorLocked(fmt.Errorf("xds: %v target %s not found, watcher timeout", wi.rType, wi.target))
}
// Caller must hold wi.mu.
func (wi *watchInfo) sendErrorLocked(err error) {
var (
u interface{}
)
switch wi.rType {
case resource.ListenerResource:
u = resource.ListenerUpdate{}
case resource.RouteConfigResource:
u = resource.RouteConfigUpdate{}
case resource.ClusterResource:
u = resource.ClusterUpdate{}
case resource.EndpointsResource:
u = resource.EndpointsUpdate{}
}
wi.c.scheduleCallback(wi, u, err)
}
func (wi *watchInfo) cancel() {
wi.mu.Lock()
defer wi.mu.Unlock()
if wi.state == watchInfoStateCanceled {
return
}
wi.expiryTimer.Stop()
wi.state = watchInfoStateCanceled
}
func (pb *Pubsub) watch(wi *watchInfo) (first bool, cancel func() bool) {
pb.mu.Lock()
defer pb.mu.Unlock()
pb.logger.Debugf("new watch for type %v, resource name %v", wi.rType, wi.target)
var (
watchers map[string]map[*watchInfo]bool
mds map[string]resource.UpdateMetadata
)
switch wi.rType {
case resource.ListenerResource:
watchers = pb.ldsWatchers
mds = pb.ldsMD
case resource.RouteConfigResource:
watchers = pb.rdsWatchers
mds = pb.rdsMD
case resource.ClusterResource:
watchers = pb.cdsWatchers
mds = pb.cdsMD
case resource.EndpointsResource:
watchers = pb.edsWatchers
mds = pb.edsMD
default:
pb.logger.Errorf("unknown watch type: %v", wi.rType)
return false, nil
}
var firstWatcher bool
resourceName := wi.target
s, ok := watchers[wi.target]
if !ok {
// If this is a new watcher, will ask lower level to send a new request
// with the resource name.
//
// If this (type+name) is already being watched, will not notify the
// underlying versioned apiClient.
pb.logger.Debugf("first watch for type %v, resource name %v, will send a new xDS request", wi.rType, wi.target)
s = make(map[*watchInfo]bool)
watchers[resourceName] = s
mds[resourceName] = resource.UpdateMetadata{Status: resource.ServiceStatusRequested}
firstWatcher = true
}
// No matter what, add the new watcher to the set, so it's callback will be
// call for new responses.
s[wi] = true
// If the resource is in cache, call the callback with the value.
switch wi.rType {
case resource.ListenerResource:
if v, ok := pb.ldsCache[resourceName]; ok {
pb.logger.Debugf("LDS resource with name %v found in cache: %+v", wi.target, pretty.ToJSON(v))
wi.newUpdate(v)
}
case resource.RouteConfigResource:
if v, ok := pb.rdsCache[resourceName]; ok {
pb.logger.Debugf("RDS resource with name %v found in cache: %+v", wi.target, pretty.ToJSON(v))
wi.newUpdate(v)
}
case resource.ClusterResource:
if v, ok := pb.cdsCache["*"]; ok {
pb.logger.Debugf("CDS resource with name * found in cache: %+v", pretty.ToJSON(v))
wi.newUpdate(v)
}
if v, ok := pb.cdsCache[resourceName]; ok {
pb.logger.Debugf("CDS resource with name %v found in cache: %+v", wi.target, pretty.ToJSON(v))
wi.newUpdate(v)
}
case resource.EndpointsResource:
if v, ok := pb.edsCache[resourceName]; ok {
pb.logger.Debugf("EDS resource with name %v found in cache: %+v", wi.target, pretty.ToJSON(v))
wi.newUpdate(v)
}
}
return firstWatcher, func() bool {
pb.logger.Debugf("watch for type %v, resource name %v canceled", wi.rType, wi.target)
wi.cancel()
pb.mu.Lock()
defer pb.mu.Unlock()
var lastWatcher bool
if s := watchers[resourceName]; s != nil {
// Remove this watcher, so it's callback will not be called in the
// future.
delete(s, wi)
if len(s) == 0 {
pb.logger.Debugf("last watch for type %v, resource name %v canceled, will send a new xDS request", wi.rType, wi.target)
// If this was the last watcher, also tell xdsv2Client to stop
// watching this resource.
delete(watchers, resourceName)
delete(mds, resourceName)
lastWatcher = true
// Remove the resource from cache. When a watch for this
// resource is added later, it will trigger a xDS request with
// resource names, and client will receive new xDS responses.
switch wi.rType {
case resource.ListenerResource:
delete(pb.ldsCache, resourceName)
case resource.RouteConfigResource:
delete(pb.rdsCache, resourceName)
case resource.ClusterResource:
delete(pb.cdsCache, resourceName)
case resource.EndpointsResource:
delete(pb.edsCache, resourceName)
}
}
}
return lastWatcher
}
}