blob: 53dce7ac3ab319c1869bab5d079b9ae277de025f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config
import (
"container/list"
"fmt"
"net/url"
"strconv"
"strings"
"sync"
"time"
)
import (
"github.com/creasty/defaults"
gxnet "github.com/dubbogo/gost/net"
perrors "github.com/pkg/errors"
"go.uber.org/atomic"
)
import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/protocolwrapper"
)
// ServiceConfig is the configuration of the service provider
type ServiceConfig struct {
id string
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
Protocol []string `default:"[\"dubbo\"]" validate:"required" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` // multi protocol support, split by ','
Interface string `validate:"required" yaml:"interface" json:"interface,omitempty" property:"interface"`
Registry []string `yaml:"registry" json:"registry,omitempty" property:"registry"`
Cluster string `default:"failover" yaml:"cluster" json:"cluster,omitempty" property:"cluster"`
Loadbalance string `default:"random" yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Group string `yaml:"group" json:"group,omitempty" property:"group"`
Version string `yaml:"version" json:"version,omitempty" property:"version" `
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
Warmup string `yaml:"warmup" json:"warmup,omitempty" property:"warmup"`
Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
Serialization string `yaml:"serialization" json:"serialization" property:"serialization"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
Token string `yaml:"token" json:"token,omitempty" property:"token"`
AccessLog string `yaml:"accesslog" json:"accesslog,omitempty" property:"accesslog"`
TpsLimiter string `yaml:"tps.limiter" json:"tps.limiter,omitempty" property:"tps.limiter"`
TpsLimitInterval string `yaml:"tps.limit.interval" json:"tps.limit.interval,omitempty" property:"tps.limit.interval"`
TpsLimitRate string `yaml:"tps.limit.rate" json:"tps.limit.rate,omitempty" property:"tps.limit.rate"`
TpsLimitStrategy string `yaml:"tps.limit.strategy" json:"tps.limit.strategy,omitempty" property:"tps.limit.strategy"`
TpsLimitRejectedHandler string `yaml:"tps.limit.rejected.handler" json:"tps.limit.rejected.handler,omitempty" property:"tps.limit.rejected.handler"`
ExecuteLimit string `yaml:"execute.limit" json:"execute.limit,omitempty" property:"execute.limit"`
ExecuteLimitRejectedHandler string `yaml:"execute.limit.rejected.handler" json:"execute.limit.rejected.handler,omitempty" property:"execute.limit.rejected.handler"`
Auth string `yaml:"auth" json:"auth,omitempty" property:"auth"`
ParamSign string `yaml:"param.sign" json:"param.sign,omitempty" property:"param.sign"`
Tag string `yaml:"tag" json:"tag,omitempty" property:"tag"`
GrpcMaxMessageSize int `default:"4" yaml:"max_message_size" json:"max_message_size,omitempty"`
Protocols map[string]*ProtocolConfig
Registries map[string]*RegistryConfig
ProxyFactoryKey string
unexported *atomic.Bool
exported *atomic.Bool
export bool // a flag to control whether the current service should export or not
rpcService common.RPCService
cacheMutex sync.Mutex
cacheProtocol protocol.Protocol
exportersLock sync.Mutex
exporters []protocol.Exporter
metadataType string
}
// Prefix returns dubbo.service.${InterfaceName}.
func (svc *ServiceConfig) Prefix() string {
return strings.Join([]string{constant.ServiceConfigPrefix, svc.id}, ".")
}
func (svc *ServiceConfig) Init(rc *RootConfig) error {
if err := initProviderMethodConfig(svc); err != nil {
return err
}
if err := defaults.Set(svc); err != nil {
return err
}
svc.exported = atomic.NewBool(false)
svc.metadataType = rc.Application.MetadataType
svc.unexported = atomic.NewBool(false)
svc.Registries = rc.Registries
svc.Protocols = rc.Protocols
if rc.Provider != nil {
svc.ProxyFactoryKey = rc.Provider.ProxyFactory
}
svc.Registry = translateRegistryIds(svc.Registry)
if len(svc.Registry) <= 0 {
svc.Registry = rc.Provider.Registry
}
svc.export = true
return verify(svc)
}
// InitExported will set exported as false atom bool
func (svc *ServiceConfig) InitExported() {
svc.exported = atomic.NewBool(false)
}
// IsExport will return whether the service config is exported or not
func (svc *ServiceConfig) IsExport() bool {
return svc.exported.Load()
}
// Get Random Port
func getRandomPort(protocolConfigs []*ProtocolConfig) *list.List {
ports := list.New()
for _, proto := range protocolConfigs {
if len(proto.Port) > 0 {
continue
}
tcp, err := gxnet.ListenOnTCPRandomPort(proto.Ip)
if err != nil {
panic(perrors.New(fmt.Sprintf("Get tcp port error, err is {%v}", err)))
}
defer tcp.Close()
ports.PushBack(strings.Split(tcp.Addr().String(), ":")[1])
}
return ports
}
// Export exports the service
func (svc *ServiceConfig) Export() error {
// TODO: delay export
if svc.unexported != nil && svc.unexported.Load() {
err := perrors.Errorf("The service %v has already unexported!", svc.Interface)
logger.Errorf(err.Error())
return err
}
if svc.unexported != nil && svc.exported.Load() {
logger.Warnf("The service %v has already exported!", svc.Interface)
return nil
}
regUrls := loadRegistries(svc.Registry, svc.Registries, common.PROVIDER)
urlMap := svc.getUrlMap()
protocolConfigs := loadProtocol(svc.Protocol, svc.Protocols)
if len(protocolConfigs) == 0 {
logger.Warnf("The service %v's '%v' protocols don't has right protocolConfigs, Please check your configuration center and transfer protocol ", svc.Interface, svc.Protocol)
return nil
}
ports := getRandomPort(protocolConfigs)
nextPort := ports.Front()
proxyFactory := extension.GetProxyFactory(svc.ProxyFactoryKey)
for _, proto := range protocolConfigs {
// registry the service reflect
methods, err := common.ServiceMap.Register(svc.Interface, proto.Name, svc.Group, svc.Version, svc.rpcService)
if err != nil {
formatErr := perrors.Errorf("The service %v export the protocol %v error! Error message is %v.",
svc.Interface, proto.Name, err.Error())
logger.Errorf(formatErr.Error())
return formatErr
}
port := proto.Port
if len(proto.Port) == 0 {
port = nextPort.Value.(string)
nextPort = nextPort.Next()
}
ivkURL := common.NewURLWithOptions(
common.WithPath(svc.Interface),
common.WithProtocol(proto.Name),
common.WithIp(proto.Ip),
common.WithPort(port),
common.WithParams(urlMap),
common.WithParamsValue(constant.BEAN_NAME_KEY, svc.id),
//common.WithParamsValue(constant.SSL_ENABLED_KEY, strconv.FormatBool(config.GetSslEnabled())),
common.WithMethods(strings.Split(methods, ",")),
common.WithToken(svc.Token),
common.WithParamsValue(constant.METADATATYPE_KEY, svc.metadataType),
)
if len(svc.Tag) > 0 {
ivkURL.AddParam(constant.Tagkey, svc.Tag)
}
// post process the URL to be exported
svc.postProcessConfig(ivkURL)
// config post processor may set "export" to false
if !ivkURL.GetParamBool(constant.EXPORT_KEY, true) {
return nil
}
if len(regUrls) > 0 {
svc.cacheMutex.Lock()
if svc.cacheProtocol == nil {
logger.Infof(fmt.Sprintf("First load the registry protocol, url is {%v}!", ivkURL))
svc.cacheProtocol = extension.GetProtocol("registry")
}
svc.cacheMutex.Unlock()
for _, regUrl := range regUrls {
regUrl.SubURL = ivkURL
invoker := proxyFactory.GetInvoker(regUrl)
exporter := svc.cacheProtocol.Export(invoker)
if exporter == nil {
return perrors.New(fmt.Sprintf("Registry protocol new exporter error, registry is {%v}, url is {%v}", regUrl, ivkURL))
}
svc.exporters = append(svc.exporters, exporter)
}
} else {
if ivkURL.GetParam(constant.INTERFACE_KEY, "") == constant.METADATA_SERVICE_NAME {
ms, err := extension.GetLocalMetadataService("")
if err != nil {
return err
}
ms.SetMetadataServiceURL(ivkURL)
}
invoker := proxyFactory.GetInvoker(ivkURL)
exporter := extension.GetProtocol(protocolwrapper.FILTER).Export(invoker)
if exporter == nil {
return perrors.New(fmt.Sprintf("Filter protocol without registry new exporter error, url is {%v}", ivkURL))
}
svc.exporters = append(svc.exporters, exporter)
}
publishServiceDefinition(ivkURL)
}
svc.exported.Store(true)
return nil
}
//loadProtocol filter protocols by ids
func loadProtocol(protocolIds []string, protocols map[string]*ProtocolConfig) []*ProtocolConfig {
returnProtocols := make([]*ProtocolConfig, 0, len(protocols))
for _, v := range protocolIds {
for k, config := range protocols {
if v == k {
returnProtocols = append(returnProtocols, config)
}
}
}
return returnProtocols
}
func loadRegistries(registryIds []string, registries map[string]*RegistryConfig, roleType common.RoleType) []*common.URL {
var registryURLs []*common.URL
//trSlice := strings.Split(targetRegistries, ",")
for k, registryConf := range registries {
target := false
// if user not config targetRegistries, default load all
// Notice: in func "func Split(s, sep string) []string" comment:
// if s does not contain sep and sep is not empty, SplitAfter returns
// a slice of length 1 whose only element is s. So we have to add the
// condition when targetRegistries string is not set (it will be "" when not set)
if len(registryIds) == 0 || (len(registryIds) == 1 && registryIds[0] == "") {
target = true
} else {
// else if user config targetRegistries
for _, tr := range registryIds {
if tr == k {
target = true
break
}
}
}
if target {
if registryURL, err := registryConf.toURL(roleType); err != nil {
logger.Errorf("The registry id: %s url is invalid, error: %#v", k, err)
panic(err)
} else {
registryURLs = append(registryURLs, registryURL)
}
}
}
return registryURLs
}
// Unexport will call unexport of all exporters service config exported
func (svc *ServiceConfig) Unexport() {
if !svc.exported.Load() {
return
}
if svc.unexported.Load() {
return
}
func() {
svc.exportersLock.Lock()
defer svc.exportersLock.Unlock()
for _, exporter := range svc.exporters {
exporter.Unexport()
}
svc.exporters = nil
}()
svc.exported.Store(false)
svc.unexported.Store(true)
}
// Implement only store the @s and return
func (svc *ServiceConfig) Implement(s common.RPCService) {
svc.rpcService = s
}
func (svc *ServiceConfig) getUrlMap() url.Values {
urlMap := url.Values{}
// first set user params
for k, v := range svc.Params {
urlMap.Set(k, v)
}
urlMap.Set(constant.INTERFACE_KEY, svc.Interface)
urlMap.Set(constant.TIMESTAMP_KEY, strconv.FormatInt(time.Now().Unix(), 10))
urlMap.Set(constant.CLUSTER_KEY, svc.Cluster)
urlMap.Set(constant.LOADBALANCE_KEY, svc.Loadbalance)
urlMap.Set(constant.WARMUP_KEY, svc.Warmup)
urlMap.Set(constant.RETRIES_KEY, svc.Retries)
urlMap.Set(constant.GROUP_KEY, svc.Group)
urlMap.Set(constant.VERSION_KEY, svc.Version)
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))
urlMap.Set(constant.RELEASE_KEY, "dubbo-golang-"+constant.Version)
urlMap.Set(constant.SIDE_KEY, (common.RoleType(common.PROVIDER)).Role())
urlMap.Set(constant.MESSAGE_SIZE_KEY, strconv.Itoa(svc.GrpcMaxMessageSize))
// todo: move
urlMap.Set(constant.SERIALIZATION_KEY, svc.Serialization)
// application config info
//urlMap.Set(constant.APPLICATION_KEY, applicationConfig.Name)
//urlMap.Set(constant.ORGANIZATION_KEY, applicationConfig.Organization)
//urlMap.Set(constant.NAME_KEY, applicationConfig.Name)
//urlMap.Set(constant.MODULE_KEY, applicationConfig.Module)
//urlMap.Set(constant.APP_VERSION_KEY, applicationConfig.Version)
//urlMap.Set(constant.OWNER_KEY, applicationConfig.Owner)
//urlMap.Set(constant.ENVIRONMENT_KEY, applicationConfig.Environment)
// filter
if svc.Filter == "" {
urlMap.Set(constant.SERVICE_FILTER_KEY, constant.DEFAULT_SERVICE_FILTERS)
} else {
urlMap.Set(constant.SERVICE_FILTER_KEY, svc.Filter)
}
// filter special config
urlMap.Set(constant.AccessLogFilterKey, svc.AccessLog)
// tps limiter
urlMap.Set(constant.TPS_LIMIT_STRATEGY_KEY, svc.TpsLimitStrategy)
urlMap.Set(constant.TPS_LIMIT_INTERVAL_KEY, svc.TpsLimitInterval)
urlMap.Set(constant.TPS_LIMIT_RATE_KEY, svc.TpsLimitRate)
urlMap.Set(constant.TPS_LIMITER_KEY, svc.TpsLimiter)
urlMap.Set(constant.TPS_REJECTED_EXECUTION_HANDLER_KEY, svc.TpsLimitRejectedHandler)
// execute limit filter
urlMap.Set(constant.EXECUTE_LIMIT_KEY, svc.ExecuteLimit)
urlMap.Set(constant.EXECUTE_REJECTED_EXECUTION_HANDLER_KEY, svc.ExecuteLimitRejectedHandler)
// auth filter
urlMap.Set(constant.SERVICE_AUTH_KEY, svc.Auth)
urlMap.Set(constant.PARAMETER_SIGNATURE_ENABLE_KEY, svc.ParamSign)
// whether to export or not
urlMap.Set(constant.EXPORT_KEY, strconv.FormatBool(svc.export))
for _, v := range svc.Methods {
prefix := "methods." + v.Name + "."
urlMap.Set(prefix+constant.LOADBALANCE_KEY, v.LoadBalance)
urlMap.Set(prefix+constant.RETRIES_KEY, v.Retries)
urlMap.Set(prefix+constant.WEIGHT_KEY, strconv.FormatInt(v.Weight, 10))
urlMap.Set(prefix+constant.TPS_LIMIT_STRATEGY_KEY, v.TpsLimitStrategy)
urlMap.Set(prefix+constant.TPS_LIMIT_INTERVAL_KEY, v.TpsLimitInterval)
urlMap.Set(prefix+constant.TPS_LIMIT_RATE_KEY, v.TpsLimitRate)
urlMap.Set(constant.EXECUTE_LIMIT_KEY, v.ExecuteLimit)
urlMap.Set(constant.EXECUTE_REJECTED_EXECUTION_HANDLER_KEY, v.ExecuteLimitRejectedHandler)
}
return urlMap
}
// GetExportedUrls will return the url in service config's exporter
func (svc *ServiceConfig) GetExportedUrls() []*common.URL {
if svc.exported.Load() {
var urls []*common.URL
for _, exporter := range svc.exporters {
urls = append(urls, exporter.GetInvoker().GetURL())
}
return urls
}
return nil
}
func (svc *ServiceConfig) publishServiceDefinition(url *common.URL) {
//svc.rootConfig.MetadataReportConfig.
if remoteMetadataService, err := extension.GetRemoteMetadataService(); err == nil && remoteMetadataService != nil {
remoteMetadataService.PublishServiceDefinition(url)
}
}
// postProcessConfig asks registered ConfigPostProcessor to post-process the current ServiceConfig.
func (svc *ServiceConfig) postProcessConfig(url *common.URL) {
for _, p := range extension.GetConfigPostProcessors() {
p.PostProcessServiceConfig(url)
}
}
// ServiceConfigOpt is the option to init ServiceConfig
type ServiceConfigOpt func(config *ServiceConfig) *ServiceConfig
// NewDefaultServiceConfig returns default ServiceConfig
func NewDefaultServiceConfig() *ServiceConfig {
newServiceConfig := &ServiceConfig{
unexported: atomic.NewBool(false),
exported: atomic.NewBool(false),
export: true,
Protocols: make(map[string]*ProtocolConfig),
Registries: make(map[string]*RegistryConfig),
}
newServiceConfig.Params = make(map[string]string)
newServiceConfig.Methods = make([]*MethodConfig, 0, 8)
return newServiceConfig
}
// NewServiceConfig returns ServiceConfig with given @opts
func NewServiceConfig(opts ...ServiceConfigOpt) *ServiceConfig {
defaultServiceConfig := NewDefaultServiceConfig()
for _, v := range opts {
v(defaultServiceConfig)
}
return defaultServiceConfig
}
// WithServiceRegistry returns ServiceConfigOpt with given registryKey @registry
func WithServiceRegistry(registry string) ServiceConfigOpt {
return func(config *ServiceConfig) *ServiceConfig {
config.Registry = append(config.Registry, registry)
return config
}
}
// WithServiceProtocolKeys returns ServiceConfigOpt with given protocolKey @protocol
func WithServiceProtocolKeys(protocolKeys ...string) ServiceConfigOpt {
return func(config *ServiceConfig) *ServiceConfig {
config.Protocol = protocolKeys
return config
}
}
// WithServiceInterface returns ServiceConfigOpt with given @interfaceName
func WithServiceInterface(interfaceName string) ServiceConfigOpt {
return func(config *ServiceConfig) *ServiceConfig {
config.Interface = interfaceName
return config
}
}
// WithServiceMetadataType returns ServiceConfigOpt with given @metadataType
func WithServiceMetadataType(metadataType string) ServiceConfigOpt {
return func(config *ServiceConfig) *ServiceConfig {
config.metadataType = metadataType
return config
}
}
// WithServiceLoadBalance returns ServiceConfigOpt with given load balance @lb
func WithServiceLoadBalance(lb string) ServiceConfigOpt {
return func(config *ServiceConfig) *ServiceConfig {
config.Loadbalance = lb
return config
}
}
// WithServiceWarmUpTime returns ServiceConfigOpt with given @warmUp time
func WithServiceWarmUpTime(warmUp string) ServiceConfigOpt {
return func(config *ServiceConfig) *ServiceConfig {
config.Warmup = warmUp
return config
}
}
// WithServiceCluster returns ServiceConfigOpt with given cluster name @cluster
func WithServiceCluster(cluster string) ServiceConfigOpt {
return func(config *ServiceConfig) *ServiceConfig {
config.Cluster = cluster
return config
}
}
// WithServiceMethod returns ServiceConfigOpt with given @name, @retries and load balance @lb
func WithServiceMethod(name, retries, lb string) ServiceConfigOpt {
return func(config *ServiceConfig) *ServiceConfig {
config.Methods = append(config.Methods, &MethodConfig{
Name: name,
Retries: retries,
LoadBalance: lb,
})
return config
}
}
func WithServiceProtocol(protocolName string, protocolConfig *ProtocolConfig) ServiceConfigOpt {
return func(config *ServiceConfig) *ServiceConfig {
config.Protocols[protocolName] = protocolConfig
return config
}
}
func WithServiceRegistries(registryName string, registryConfig *RegistryConfig) ServiceConfigOpt {
return func(config *ServiceConfig) *ServiceConfig {
config.Registries[registryName] = registryConfig
return config
}
}
func WithServiceGroup(group string) ServiceConfigOpt {
return func(config *ServiceConfig) *ServiceConfig {
config.Group = group
return config
}
}
func WithServiceVersion(version string) ServiceConfigOpt {
return func(config *ServiceConfig) *ServiceConfig {
config.Version = version
return config
}
}
func WithProxyFactoryKey(proxyFactoryKey string) ServiceConfigOpt {
return func(config *ServiceConfig) *ServiceConfig {
config.ProxyFactoryKey = proxyFactoryKey
return config
}
}
func WithRPCService(service common.RPCService) ServiceConfigOpt {
return func(config *ServiceConfig) *ServiceConfig {
config.rpcService = service
return config
}
}
func WithServiceID(id string) ServiceConfigOpt {
return func(config *ServiceConfig) *ServiceConfig {
config.id = id
return config
}
}