blob: d73190c48f1c98b259732cae61c7e0b63c09a8fa [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package config
import (
import (
gxnet ""
perrors ""
import (
// ServiceConfig is the configuration of the service provider
type ServiceConfig struct {
context context.Context
id string
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
Protocol string `default:"dubbo" required:"true" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` // multi protocol support, split by ','
InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"`
Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"`
Cluster string `default:"failover" yaml:"cluster" json:"cluster,omitempty" property:"cluster"`
Loadbalance string `default:"random" yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Group string `yaml:"group" json:"group,omitempty" property:"group"`
Version string `yaml:"version" json:"version,omitempty" property:"version" `
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
Warmup string `yaml:"warmup" json:"warmup,omitempty" property:"warmup"`
Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
Serialization string `yaml:"serialization" json:"serialization" property:"serialization"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
Token string `yaml:"token" json:"token,omitempty" property:"token"`
AccessLog string `yaml:"accesslog" json:"accesslog,omitempty" property:"accesslog"`
TpsLimiter string `yaml:"tps.limiter" json:"tps.limiter,omitempty" property:"tps.limiter"`
TpsLimitInterval string `yaml:"tps.limit.interval" json:"tps.limit.interval,omitempty" property:"tps.limit.interval"`
TpsLimitRate string `yaml:"tps.limit.rate" json:"tps.limit.rate,omitempty" property:"tps.limit.rate"`
TpsLimitStrategy string `yaml:"tps.limit.strategy" json:"tps.limit.strategy,omitempty" property:"tps.limit.strategy"`
TpsLimitRejectedHandler string `yaml:"tps.limit.rejected.handler" json:"tps.limit.rejected.handler,omitempty" property:"tps.limit.rejected.handler"`
ExecuteLimit string `yaml:"execute.limit" json:"execute.limit,omitempty" property:"execute.limit"`
ExecuteLimitRejectedHandler string `yaml:"execute.limit.rejected.handler" json:"execute.limit.rejected.handler,omitempty" property:"execute.limit.rejected.handler"`
Auth string `yaml:"auth" json:"auth,omitempty" property:"auth"`
ParamSign string `yaml:"param.sign" json:"param.sign,omitempty" property:"param.sign"`
Tag string `yaml:"tag" json:"tag,omitempty" property:"tag"`
GrpcMaxMessageSize int `default:"4" yaml:"max_message_size" json:"max_message_size,omitempty"`
Protocols map[string]*ProtocolConfig
unexported *atomic.Bool
exported *atomic.Bool
export bool // a flag to control whether the current service should export or not
rpcService common.RPCService
cacheMutex sync.Mutex
cacheProtocol protocol.Protocol
exportersLock sync.Mutex
exporters []protocol.Exporter
// Prefix returns dubbo.service.${interface}.
func (c *ServiceConfig) Prefix() string {
return constant.ServiceConfigPrefix + c.InterfaceName + "."
// UnmarshalYAML unmarshals the ServiceConfig by @unmarshal function
func (c *ServiceConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
if err := defaults.Set(c); err != nil {
return err
type plain ServiceConfig
if err := unmarshal((*plain)(c)); err != nil {
return err
c.exported = atomic.NewBool(false)
c.unexported = atomic.NewBool(false)
c.export = true
return nil
// NewServiceConfig The only way to get a new ServiceConfig
func NewServiceConfig(id string, context context.Context) *ServiceConfig {
return &ServiceConfig{
context: context,
id: id,
unexported: atomic.NewBool(false),
exported: atomic.NewBool(false),
export: true,
// InitExported will set exported as false atom bool
func (c *ServiceConfig) InitExported() {
c.exported = atomic.NewBool(false)
// IsExport will return whether the service config is exported or not
func (c *ServiceConfig) IsExport() bool {
return c.exported.Load()
// Get Random Port
func getRandomPort(protocolConfigs []*ProtocolConfig) *list.List {
ports := list.New()
for _, proto := range protocolConfigs {
if len(proto.Port) > 0 {
tcp, err := gxnet.ListenOnTCPRandomPort(proto.Ip)
if err != nil {
panic(perrors.New(fmt.Sprintf("Get tcp port error, err is {%v}", err)))
defer tcp.Close()
ports.PushBack(strings.Split(tcp.Addr().String(), ":")[1])
return ports
// Export exports the service
func (c *ServiceConfig) Export() error {
// TODO: config center start here
// TODO: delay export
if c.unexported != nil && c.unexported.Load() {
err := perrors.Errorf("The service %v has already unexported!", c.InterfaceName)
return err
if c.unexported != nil && c.exported.Load() {
logger.Warnf("The service %v has already exported!", c.InterfaceName)
return nil
regUrls := loadRegistries(c.Registry, providerConfig.Registries, common.PROVIDER)
urlMap := c.getUrlMap()
protocolConfigs := loadProtocol(c.Protocol, c.Protocols)
if len(protocolConfigs) == 0 {
logger.Warnf("The service %v's '%v' protocols don't has right protocolConfigs", c.InterfaceName, c.Protocol)
return nil
ports := getRandomPort(protocolConfigs)
nextPort := ports.Front()
proxyFactory := extension.GetProxyFactory(providerConfig.ProxyFactory)
for _, proto := range protocolConfigs {
// registry the service reflect
methods, err := common.ServiceMap.Register(c.InterfaceName, proto.Name, c.Group, c.Version, c.rpcService)
if err != nil {
formatErr := perrors.Errorf("The service %v export the protocol %v error! Error message is %v.",
c.InterfaceName, proto.Name, err.Error())
return formatErr
port := proto.Port
if len(proto.Port) == 0 {
port = nextPort.Value.(string)
nextPort = nextPort.Next()
ivkURL := common.NewURLWithOptions(
common.WithParamsValue(constant.SSL_ENABLED_KEY, strconv.FormatBool(GetSslEnabled())),
common.WithMethods(strings.Split(methods, ",")),
if len(c.Tag) > 0 {
ivkURL.AddParam(constant.Tagkey, c.Tag)
// post process the URL to be exported
// config post processor may set "export" to false
if !ivkURL.GetParamBool(constant.EXPORT_KEY, true) {
return nil
if len(regUrls) > 0 {
if c.cacheProtocol == nil {
logger.Infof(fmt.Sprintf("First load the registry protocol, url is {%v}!", ivkURL))
c.cacheProtocol = extension.GetProtocol("registry")
for _, regUrl := range regUrls {
regUrl.SubURL = ivkURL
invoker := proxyFactory.GetInvoker(regUrl)
exporter := c.cacheProtocol.Export(invoker)
if exporter == nil {
return perrors.New(fmt.Sprintf("Registry protocol new exporter error, registry is {%v}, url is {%v}", regUrl, ivkURL))
c.exporters = append(c.exporters, exporter)
} else {
invoker := proxyFactory.GetInvoker(ivkURL)
exporter := extension.GetProtocol(protocolwrapper.FILTER).Export(invoker)
if exporter == nil {
return perrors.New(fmt.Sprintf("Filter protocol without registry new exporter error, url is {%v}", ivkURL))
c.exporters = append(c.exporters, exporter)
return nil
// Unexport will call unexport of all exporters service config exported
func (c *ServiceConfig) Unexport() {
if !c.exported.Load() {
if c.unexported.Load() {
func() {
defer c.exportersLock.Unlock()
for _, exporter := range c.exporters {
c.exporters = nil
// Implement only store the @s and return
func (c *ServiceConfig) Implement(s common.RPCService) {
c.rpcService = s
func (c *ServiceConfig) getUrlMap() url.Values {
urlMap := url.Values{}
// first set user params
for k, v := range c.Params {
urlMap.Set(k, v)
urlMap.Set(constant.INTERFACE_KEY, c.InterfaceName)
urlMap.Set(constant.TIMESTAMP_KEY, strconv.FormatInt(time.Now().Unix(), 10))
urlMap.Set(constant.CLUSTER_KEY, c.Cluster)
urlMap.Set(constant.LOADBALANCE_KEY, c.Loadbalance)
urlMap.Set(constant.WARMUP_KEY, c.Warmup)
urlMap.Set(constant.RETRIES_KEY, c.Retries)
urlMap.Set(constant.GROUP_KEY, c.Group)
urlMap.Set(constant.VERSION_KEY, c.Version)
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))
urlMap.Set(constant.RELEASE_KEY, "dubbo-golang-"+constant.Version)
urlMap.Set(constant.SIDE_KEY, (common.RoleType(common.PROVIDER)).Role())
urlMap.Set(constant.MESSAGE_SIZE_KEY, strconv.Itoa(c.GrpcMaxMessageSize))
// todo: move
urlMap.Set(constant.SERIALIZATION_KEY, c.Serialization)
// application info
urlMap.Set(constant.APPLICATION_KEY, providerConfig.ApplicationConfig.Name)
urlMap.Set(constant.ORGANIZATION_KEY, providerConfig.ApplicationConfig.Organization)
urlMap.Set(constant.NAME_KEY, providerConfig.ApplicationConfig.Name)
urlMap.Set(constant.MODULE_KEY, providerConfig.ApplicationConfig.Module)
urlMap.Set(constant.APP_VERSION_KEY, providerConfig.ApplicationConfig.Version)
urlMap.Set(constant.OWNER_KEY, providerConfig.ApplicationConfig.Owner)
urlMap.Set(constant.ENVIRONMENT_KEY, providerConfig.ApplicationConfig.Environment)
// filter
urlMap.Set(constant.SERVICE_FILTER_KEY, mergeValue(providerConfig.Filter, c.Filter, constant.DEFAULT_SERVICE_FILTERS))
// filter special config
urlMap.Set(constant.ACCESS_LOG_KEY, c.AccessLog)
// tps limiter
urlMap.Set(constant.TPS_LIMIT_STRATEGY_KEY, c.TpsLimitStrategy)
urlMap.Set(constant.TPS_LIMIT_INTERVAL_KEY, c.TpsLimitInterval)
urlMap.Set(constant.TPS_LIMIT_RATE_KEY, c.TpsLimitRate)
urlMap.Set(constant.TPS_LIMITER_KEY, c.TpsLimiter)
urlMap.Set(constant.TPS_REJECTED_EXECUTION_HANDLER_KEY, c.TpsLimitRejectedHandler)
// execute limit filter
urlMap.Set(constant.EXECUTE_LIMIT_KEY, c.ExecuteLimit)
urlMap.Set(constant.EXECUTE_REJECTED_EXECUTION_HANDLER_KEY, c.ExecuteLimitRejectedHandler)
// auth filter
urlMap.Set(constant.SERVICE_AUTH_KEY, c.Auth)
urlMap.Set(constant.PARAMETER_SIGNATURE_ENABLE_KEY, c.ParamSign)
// whether to export or not
urlMap.Set(constant.EXPORT_KEY, strconv.FormatBool(c.export))
for _, v := range c.Methods {
prefix := "methods." + v.Name + "."
urlMap.Set(prefix+constant.LOADBALANCE_KEY, v.LoadBalance)
urlMap.Set(prefix+constant.RETRIES_KEY, v.Retries)
urlMap.Set(prefix+constant.WEIGHT_KEY, strconv.FormatInt(v.Weight, 10))
urlMap.Set(prefix+constant.TPS_LIMIT_STRATEGY_KEY, v.TpsLimitStrategy)
urlMap.Set(prefix+constant.TPS_LIMIT_INTERVAL_KEY, v.TpsLimitInterval)
urlMap.Set(prefix+constant.TPS_LIMIT_RATE_KEY, v.TpsLimitRate)
urlMap.Set(constant.EXECUTE_LIMIT_KEY, v.ExecuteLimit)
urlMap.Set(constant.EXECUTE_REJECTED_EXECUTION_HANDLER_KEY, v.ExecuteLimitRejectedHandler)
return urlMap
// GetExportedUrls will return the url in service config's exporter
func (c *ServiceConfig) GetExportedUrls() []*common.URL {
if c.exported.Load() {
var urls []*common.URL
for _, exporter := range c.exporters {
urls = append(urls, exporter.GetInvoker().GetURL())
return urls
return nil
func publishServiceDefinition(url *common.URL) {
if remoteMetadataService, err := extension.GetRemoteMetadataService(); err == nil && remoteMetadataService != nil {
// postProcessConfig asks registered ConfigPostProcessor to post-process the current ServiceConfig.
func (c *ServiceConfig) postProcessConfig(url *common.URL) {
for _, p := range extension.GetConfigPostProcessors() {