blob: 2821ba8f5f7e7a1858712425d7a76ec42c007dae [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
*
* Copyright 2019 gRPC authors.
*
*/
// package client implements a full fledged gRPC client for the xDS API used
// by the xds resolver and balancer implementations.
package client
import (
"fmt"
"sync"
"time"
)
import (
dubbogoLogger "github.com/dubbogo/gost/log/logger"
_struct "github.com/golang/protobuf/ptypes/struct"
)
import (
"dubbo.apache.org/dubbo-go/v3/xds/client/bootstrap"
"dubbo.apache.org/dubbo-go/v3/xds/client/resource"
"dubbo.apache.org/dubbo-go/v3/xds/utils/grpcsync"
cache "dubbo.apache.org/dubbo-go/v3/xds/utils/xds_cache"
)
// clientImpl is the real implementation of the xds client. The exported Client
// is a wrapper of this struct with a ref count.
//
// Implements UpdateHandler interface.
// TODO(easwars): Make a wrapper struct which implements this interface in the
// style of ccBalancerWrapper so that the Client type does not implement these
// exported methods.
type clientImpl struct {
done *grpcsync.Event
config *bootstrap.Config
refreshMetadataCancel func()
// authorityMu protects the authority fields. It's necessary because an
// authority is created when it's used.
authorityMu sync.Mutex
// authorities is a map from ServerConfig to authority. So that
// different authorities sharing the same ServerConfig can share the
// authority.
//
// The key is **ServerConfig.String()**, not the authority name.
//
// An authority is either in authorities, or idleAuthorities,
// never both.
authorities map[string]*authority
// idleAuthorities keeps the authorities that are not used (the last
// watch on it was canceled). They are kept in the cache and will be deleted
// after a timeout. The key is ServerConfig.String().
//
// An authority is either in authorities, or idleAuthorities,
// never both.
idleAuthorities *cache.TimeoutCache
logger dubbogoLogger.Logger
watchExpiryTimeout time.Duration
}
// newWithConfig returns a new xdsClient with the given config.
func newWithConfig(config *bootstrap.Config, watchExpiryTimeout time.Duration, idleAuthorityDeleteTimeout time.Duration) (_ *clientImpl, retErr error) {
c := &clientImpl{
done: grpcsync.NewEvent(),
config: config,
watchExpiryTimeout: watchExpiryTimeout,
authorities: make(map[string]*authority),
idleAuthorities: cache.NewTimeoutCache(idleAuthorityDeleteTimeout),
}
defer func() {
if retErr != nil {
c.Close()
}
}()
c.logger = dubbogoLogger.GetLogger()
c.logger.Infof("Created ClientConn to xDS management server: %s", config.XDSServer)
c.logger.Infof("Created")
return c, nil
}
func (c *clientImpl) SetMetadata(m *_struct.Struct) error {
a, _, err := c.findAuthority(resource.ParseName(""))
if err != nil {
return err
}
if err := a.SetMetadata(m); err != nil {
return err
}
a.watchEndpoints("", func(update resource.EndpointsUpdate, err error) {})
return nil
}
// BootstrapConfig returns the configuration read from the bootstrap file.
// Callers must treat the return value as read-only.
func (c *clientRefCounted) BootstrapConfig() *bootstrap.Config {
return c.config
}
// Close closes the gRPC connection to the management server.
func (c *clientImpl) Close() {
if c.done.HasFired() {
return
}
c.done.Fire()
// TODO: Should we invoke the registered callbacks here with an error that
// the client is closed?
// Note that Close needs to check for nils even if some of them are always
// set in the constructor. This is because the constructor defers Close() in
// error cases, and the fields might not be set when the error happens.
c.authorityMu.Lock()
for _, a := range c.authorities {
a.close()
}
c.idleAuthorities.Clear(true)
c.authorityMu.Unlock()
c.logger.Infof("Shutdown")
}
func (c *clientImpl) filterChainUpdateValidator(fc *resource.FilterChain) error {
if fc == nil {
return nil
}
return c.securityConfigUpdateValidator(fc.SecurityCfg)
}
func (c *clientImpl) securityConfigUpdateValidator(sc *resource.SecurityConfig) error {
if sc == nil {
return nil
}
if sc.IdentityInstanceName != "" {
if _, ok := c.config.CertProviderConfigs[sc.IdentityInstanceName]; !ok {
return fmt.Errorf("identitiy certificate provider instance name %q missing in bootstrap configuration", sc.IdentityInstanceName)
}
}
if sc.RootInstanceName != "" {
if _, ok := c.config.CertProviderConfigs[sc.RootInstanceName]; !ok {
return fmt.Errorf("root certificate provider instance name %q missing in bootstrap configuration", sc.RootInstanceName)
}
}
return nil
}
func (c *clientImpl) updateValidator(u interface{}) error {
switch update := u.(type) {
case resource.ListenerUpdate:
if update.InboundListenerCfg == nil || update.InboundListenerCfg.FilterChains == nil {
return nil
}
return update.InboundListenerCfg.FilterChains.Validate(c.filterChainUpdateValidator)
case resource.ClusterUpdate:
return c.securityConfigUpdateValidator(update.SecurityCfg)
default:
// We currently invoke this update validation function only for LDS and
// CDS updates. In the future, if we wish to invoke it for other xDS
// updates, corresponding plumbing needs to be added to those unmarshal
// functions.
}
return nil
}