blob: 6e9e5394f4838c85d7d474d4576abf897da40e70 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
*
* Copyright 2020 gRPC authors.
*
*/
// Package resolver implements the xds resolver, that does LDS and RDS to find
// the cluster to use.
package resolver
import (
"errors"
"fmt"
"strings"
)
import (
dubbogoLogger "github.com/dubbogo/gost/log/logger"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/resolver"
)
import (
"dubbo.apache.org/dubbo-go/v3/xds/client"
"dubbo.apache.org/dubbo-go/v3/xds/client/bootstrap"
"dubbo.apache.org/dubbo-go/v3/xds/client/resource"
"dubbo.apache.org/dubbo-go/v3/xds/utils/grpcsync"
"dubbo.apache.org/dubbo-go/v3/xds/utils/pretty"
iresolver "dubbo.apache.org/dubbo-go/v3/xds/utils/resolver"
)
const xdsScheme = "xds"
// NewBuilder creates a new xds resolver builder using a specific xds bootstrap
// config, so tests can use multiple xds clients in different ClientConns at
// the same time.
func NewBuilder(config []byte) (resolver.Builder, error) {
return &xdsResolverBuilder{
newXDSClient: func() (client.XDSClient, error) {
return client.NewClientWithBootstrapContents(config)
},
}, nil
}
// For overriding in unittests.
var newXDSClient = func() (client.XDSClient, error) { return client.New() }
func init() {
resolver.Register(&xdsResolverBuilder{})
}
type xdsResolverBuilder struct {
newXDSClient func() (client.XDSClient, error)
}
// Build helps implement the resolver.Builder interface.
//
// The xds bootstrap process is performed (and a new xds client is built) every
// time an xds resolver is built.
func (b *xdsResolverBuilder) Build(t resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (_ resolver.Resolver, retErr error) {
r := &xdsResolver{
target: t,
cc: cc,
closed: grpcsync.NewEvent(),
updateCh: make(chan suWithError, 1),
activeClusters: make(map[string]*clusterInfo),
}
defer func() {
if retErr != nil {
r.Close()
}
}()
r.logger = dubbogoLogger.GetLogger()
r.logger.Infof("Creating resolver for target: %+v", t)
newXDSClient := newXDSClient
if b.newXDSClient != nil {
newXDSClient = b.newXDSClient
}
client, err := newXDSClient()
if err != nil {
return nil, fmt.Errorf("xds: failed to create xds-client: %v", err)
}
r.client = client
bootstrapConfig := client.BootstrapConfig()
if bootstrapConfig == nil {
return nil, errors.New("bootstrap configuration is empty")
}
// If xds credentials were specified by the user, but bootstrap configs do
// not contain any certificate provider configuration, it is better to fail
// right now rather than failing when attempting to create certificate
// providers after receiving an CDS response with security configuration.
var creds credentials.TransportCredentials
switch {
case opts.DialCreds != nil:
creds = opts.DialCreds
case opts.CredsBundle != nil:
creds = opts.CredsBundle.TransportCredentials()
}
if xc, ok := creds.(interface{ UsesXDS() bool }); ok && xc.UsesXDS() {
if len(bootstrapConfig.CertProviderConfigs) == 0 {
return nil, errors.New("xds: xdsCreds specified but certificate_providers config missing in bootstrap file")
}
}
// Find the client listener template to use from the bootstrap config:
// - If authority is not set in the target, use the top level template
// - If authority is set, use the template from the authority map.
template := bootstrapConfig.ClientDefaultListenerResourceNameTemplate
if authority := r.target.URL.Host; authority != "" {
a := bootstrapConfig.Authorities[authority]
if a == nil {
return nil, fmt.Errorf("xds: authority %q is not found in the bootstrap file", authority)
}
if a.ClientListenerResourceNameTemplate != "" {
// This check will never be false, because
// ClientListenerResourceNameTemplate is required to start with
// xdstp://, and has a default value (not an empty string) if unset.
template = a.ClientListenerResourceNameTemplate
}
}
endpoint := r.target.URL.Path
if endpoint == "" {
endpoint = r.target.URL.Opaque
}
endpoint = strings.TrimPrefix(endpoint, "/")
resourceName := bootstrap.PopulateResourceTemplate(template, endpoint)
// Register a watch on the xdsClient for the user's dial target.
cancelWatch := watchService(r.client, resourceName, r.handleServiceUpdate, r.logger)
r.logger.Infof("Watch started on resource name %v with xds-client %p", r.target.Endpoint, r.client)
r.cancelWatch = func() {
cancelWatch()
r.logger.Infof("Watch cancel on resource name %v with xds-client %p", r.target.Endpoint, r.client)
}
go r.run()
return r, nil
}
// Name helps implement the resolver.Builder interface.
func (*xdsResolverBuilder) Scheme() string {
return xdsScheme
}
// suWithError wraps the ServiceUpdate and error received through a watch API
// callback, so that it can pushed onto the update channel as a single entity.
type suWithError struct {
su serviceUpdate
emptyUpdate bool
err error
}
// xdsResolver implements the resolver.Resolver interface.
//
// It registers a watcher for ServiceConfig updates with the xdsClient object
// (which performs LDS/RDS queries for the same), and passes the received
// updates to the ClientConn.
type xdsResolver struct {
target resolver.Target
cc resolver.ClientConn
closed *grpcsync.Event
logger dubbogoLogger.Logger
// The underlying xdsClient which performs all xDS requests and responses.
client client.XDSClient
// A channel for the watch API callback to write service updates on to. The
// updates are read by the run goroutine and passed on to the ClientConn.
updateCh chan suWithError
// cancelWatch is the function to cancel the watcher.
cancelWatch func()
// activeClusters is a map from cluster name to a ref count. Only read or
// written during a service update (synchronous).
activeClusters map[string]*clusterInfo
curConfigSelector *configSelector
}
// sendNewServiceConfig prunes active clusters, generates a new service config
// based on the current set of active clusters, and sends an update to the
// channel with that service config and the provided config selector. Returns
// false if an error occurs while generating the service config and the update
// cannot be sent.
func (r *xdsResolver) sendNewServiceConfig(cs *configSelector) bool {
// Delete entries from r.activeClusters with zero references;
// otherwise serviceConfigJSON will generate a config including
// them.
r.pruneActiveClusters()
if cs == nil && len(r.activeClusters) == 0 {
// There are no clusters and we are sending a failing configSelector.
// Send an empty config, which picks pick-first, with no address, and
// puts the ClientConn into transient failure.
r.cc.UpdateState(resolver.State{ServiceConfig: r.cc.ParseServiceConfig("{}")})
return true
}
sc, err := serviceConfigJSON(r.activeClusters)
if err != nil {
// JSON marshal error; should never happen.
r.logger.Errorf("%v", err)
r.cc.ReportError(err)
return false
}
r.logger.Infof("Received update on resource %v from xds-client %p, generated service config: %v", r.target.Endpoint, r.client, pretty.FormatJSON(sc))
// Send the update to the ClientConn.
state := iresolver.SetConfigSelector(resolver.State{
ServiceConfig: r.cc.ParseServiceConfig(string(sc)),
}, cs)
r.cc.UpdateState(client.SetClient(state, r.client))
return true
}
// run is a long running goroutine which blocks on receiving service updates
// and passes it on the ClientConn.
func (r *xdsResolver) run() {
for {
select {
case <-r.closed.Done():
return
case update := <-r.updateCh:
if update.err != nil {
r.logger.Warnf("Watch error on resource %v from xds-client %p, %v", r.target.Endpoint, r.client, update.err)
if resource.ErrType(update.err) == resource.ErrorTypeResourceNotFound {
// If error is resource-not-found, it means the LDS
// resource was removed. Ultimately send an empty service
// config, which picks pick-first, with no address, and
// puts the ClientConn into transient failure. Before we
// can do that, we may need to send a normal service config
// along with an erroring (nil) config selector.
r.sendNewServiceConfig(nil)
// Stop and dereference the active config selector, if one exists.
r.curConfigSelector.stop()
r.curConfigSelector = nil
continue
}
// Send error to ClientConn, and balancers, if error is not
// resource not found. No need to update resolver state if we
// can keep using the old config.
r.cc.ReportError(update.err)
continue
}
if update.emptyUpdate {
r.sendNewServiceConfig(r.curConfigSelector)
continue
}
// Create the config selector for this update.
cs, err := r.newConfigSelector(update.su)
if err != nil {
r.logger.Warnf("Error parsing update on resource %v from xds-client %p: %v", r.target.Endpoint, r.client, err)
r.cc.ReportError(err)
continue
}
if !r.sendNewServiceConfig(cs) {
// JSON error creating the service config (unexpected); erase
// this config selector and ignore this update, continuing with
// the previous config selector.
cs.stop()
continue
}
// Decrement references to the old config selector and assign the
// new one as the current one.
r.curConfigSelector.stop()
r.curConfigSelector = cs
}
}
}
// handleServiceUpdate is the callback which handles service updates. It writes
// the received update to the update channel, which is picked by the run
// goroutine.
func (r *xdsResolver) handleServiceUpdate(su serviceUpdate, err error) {
if r.closed.HasFired() {
// Do not pass updates to the ClientConn once the resolver is closed.
return
}
// Remove any existing entry in updateCh and replace with the new one.
select {
case <-r.updateCh:
default:
}
r.updateCh <- suWithError{su: su, err: err}
}
// ResolveNow is a no-op at this point.
func (*xdsResolver) ResolveNow(o resolver.ResolveNowOptions) {}
// Close closes the resolver, and also closes the underlying client.
func (r *xdsResolver) Close() {
// Note that Close needs to check for nils even if some of them are always
// set in the constructor. This is because the constructor defers Close() in
// error cases, and the fields might not be set when the error happens.
if r.cancelWatch != nil {
r.cancelWatch()
}
if r.client != nil {
r.client.Close()
}
r.closed.Fire()
r.logger.Infof("Shutdown")
}