blob: ea256827298b7daedac7e86375d65584b693831b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
*
* Copyright 2021 gRPC authors.
*
*/
// Package pubsub implements a utility type to maintain resource watchers and
// the updates.
//
// This package is designed to work with the xds resources. It could be made a
// general system that works with all types.
package pubsub
import (
"sync"
"time"
)
import (
dubbogoLogger "github.com/dubbogo/gost/log/logger"
)
import (
"dubbo.apache.org/dubbo-go/v3/xds/client/resource"
"dubbo.apache.org/dubbo-go/v3/xds/utils/buffer"
"dubbo.apache.org/dubbo-go/v3/xds/utils/grpcsync"
)
// Pubsub maintains resource watchers and resource updates.
//
// There can be multiple watchers for the same resource. An update to a resource
// triggers updates to all the existing watchers. Watchers can be canceled at
// any time.
type Pubsub struct {
done *grpcsync.Event
logger dubbogoLogger.Logger
watchExpiryTimeout time.Duration
updateCh *buffer.Unbounded // chan *watcherInfoWithUpdate
// All the following maps are to keep the updates/metadata in a cache.
mu sync.Mutex
ldsWatchers map[string]map[*watchInfo]bool
ldsCache map[string]resource.ListenerUpdate
ldsMD map[string]resource.UpdateMetadata
rdsWatchers map[string]map[*watchInfo]bool
rdsCache map[string]resource.RouteConfigUpdate
rdsMD map[string]resource.UpdateMetadata
cdsWatchers map[string]map[*watchInfo]bool
cdsCache map[string]resource.ClusterUpdate
cdsMD map[string]resource.UpdateMetadata
edsWatchers map[string]map[*watchInfo]bool
edsCache map[string]resource.EndpointsUpdate
edsMD map[string]resource.UpdateMetadata
}
// New creates a new Pubsub.
func New(watchExpiryTimeout time.Duration, logger dubbogoLogger.Logger) *Pubsub {
pb := &Pubsub{
done: grpcsync.NewEvent(),
logger: logger,
watchExpiryTimeout: watchExpiryTimeout,
updateCh: buffer.NewUnbounded(),
ldsWatchers: make(map[string]map[*watchInfo]bool),
ldsCache: make(map[string]resource.ListenerUpdate),
ldsMD: make(map[string]resource.UpdateMetadata),
rdsWatchers: make(map[string]map[*watchInfo]bool),
rdsCache: make(map[string]resource.RouteConfigUpdate),
rdsMD: make(map[string]resource.UpdateMetadata),
cdsWatchers: make(map[string]map[*watchInfo]bool),
cdsCache: make(map[string]resource.ClusterUpdate),
cdsMD: make(map[string]resource.UpdateMetadata),
edsWatchers: make(map[string]map[*watchInfo]bool),
edsCache: make(map[string]resource.EndpointsUpdate),
edsMD: make(map[string]resource.UpdateMetadata),
}
go pb.run()
return pb
}
// WatchListener registers a watcher for the LDS resource.
//
// It also returns whether this is the first watch for this resource.
func (pb *Pubsub) WatchListener(serviceName string, cb func(resource.ListenerUpdate, error)) (first bool, cancel func() bool) {
wi := &watchInfo{
c: pb,
rType: resource.ListenerResource,
target: serviceName,
ldsCallback: cb,
}
wi.expiryTimer = time.AfterFunc(pb.watchExpiryTimeout, func() {
wi.timeout()
})
return pb.watch(wi)
}
// WatchRouteConfig register a watcher for the RDS resource.
//
// It also returns whether this is the first watch for this resource.
func (pb *Pubsub) WatchRouteConfig(routeName string, cb func(resource.RouteConfigUpdate, error)) (first bool, cancel func() bool) {
wi := &watchInfo{
c: pb,
rType: resource.RouteConfigResource,
target: routeName,
rdsCallback: cb,
}
wi.expiryTimer = time.AfterFunc(pb.watchExpiryTimeout, func() {
wi.timeout()
})
return pb.watch(wi)
}
// WatchCluster register a watcher for the CDS resource.
//
// It also returns whether this is the first watch for this resource.
func (pb *Pubsub) WatchCluster(clusterName string, cb func(resource.ClusterUpdate, error)) (first bool, cancel func() bool) {
wi := &watchInfo{
c: pb,
rType: resource.ClusterResource,
target: clusterName,
cdsCallback: cb,
}
wi.expiryTimer = time.AfterFunc(pb.watchExpiryTimeout, func() {
wi.timeout()
})
return pb.watch(wi)
}
// WatchEndpoints registers a watcher for the EDS resource.
//
// It also returns whether this is the first watch for this resource.
func (pb *Pubsub) WatchEndpoints(clusterName string, cb func(resource.EndpointsUpdate, error)) (first bool, cancel func() bool) {
wi := &watchInfo{
c: pb,
rType: resource.EndpointsResource,
target: clusterName,
edsCallback: cb,
}
wi.expiryTimer = time.AfterFunc(pb.watchExpiryTimeout, func() {
wi.timeout()
})
return pb.watch(wi)
}
// Close closes the pubsub.
func (pb *Pubsub) Close() {
if pb.done.HasFired() {
return
}
pb.done.Fire()
}
// run is a goroutine for all the callbacks.
//
// Callback can be called in watch(), if an item is found in cache. Without this
// goroutine, the callback will be called inline, which might cause a deadlock
// in user's code. Callbacks also cannot be simple `go callback()` because the
// order matters.
func (pb *Pubsub) run() {
for {
select {
case t := <-pb.updateCh.Get():
pb.updateCh.Load()
if pb.done.HasFired() {
return
}
pb.callCallback(t.(*watcherInfoWithUpdate))
case <-pb.done.Done():
return
}
}
}