blob: 6f9332175074b2242e04763d55f7ae45d78c7b21 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package server
import (
"container/list"
"fmt"
"net/url"
"os"
"strconv"
"strings"
"time"
)
import (
"github.com/dubbogo/gost/log/logger"
gxnet "github.com/dubbogo/gost/net"
perrors "github.com/pkg/errors"
"go.uber.org/atomic"
)
import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/global"
"dubbo.apache.org/dubbo-go/v3/graceful_shutdown"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/protocolwrapper"
)
// Prefix returns dubbo.service.${InterfaceName}.
func (svcOpts *ServiceOptions) Prefix() string {
return strings.Join([]string{constant.ServiceConfigPrefix, svcOpts.Id}, ".")
}
func (svcOpts *ServiceOptions) check() error {
srv := svcOpts.Service
// check if the limiter has been imported
if srv.TpsLimiter != "" {
_, err := extension.GetTpsLimiter(srv.TpsLimiter)
if err != nil {
panic(err)
}
}
if srv.TpsLimitStrategy != "" {
_, err := extension.GetTpsLimitStrategyCreator(srv.TpsLimitStrategy)
if err != nil {
panic(err)
}
}
if srv.TpsLimitRejectedHandler != "" {
_, err := extension.GetRejectedExecutionHandler(srv.TpsLimitRejectedHandler)
if err != nil {
panic(err)
}
}
if srv.TpsLimitInterval != "" {
tpsLimitInterval, err := strconv.ParseInt(srv.TpsLimitInterval, 0, 0)
if err != nil {
return fmt.Errorf("[ServiceConfig] Cannot parse the configuration tps.limit.interval for service %svcOpts, please check your configuration", srv.Interface)
}
if tpsLimitInterval < 0 {
return fmt.Errorf("[ServiceConfig] The configuration tps.limit.interval for service %svcOpts must be positive, please check your configuration", srv.Interface)
}
}
if srv.TpsLimitRate != "" {
tpsLimitRate, err := strconv.ParseInt(srv.TpsLimitRate, 0, 0)
if err != nil {
return fmt.Errorf("[ServiceConfig] Cannot parse the configuration tps.limit.rate for service %svcOpts, please check your configuration", srv.Interface)
}
if tpsLimitRate < 0 {
return fmt.Errorf("[ServiceConfig] The configuration tps.limit.rate for service %svcOpts must be positive, please check your configuration", srv.Interface)
}
}
return nil
}
// InitExported will set exported as false atom bool
func (svcOpts *ServiceOptions) InitExported() {
svcOpts.exported = atomic.NewBool(false)
}
// IsExport will return whether the service config is exported or not
func (svcOpts *ServiceOptions) IsExport() bool {
return svcOpts.exported.Load()
}
// Get Random Port
func getRandomPort(protocolConfigs []*config.ProtocolConfig) *list.List {
ports := list.New()
for _, proto := range protocolConfigs {
if len(proto.Port) > 0 {
continue
}
tcp, err := gxnet.ListenOnTCPRandomPort(proto.Ip)
if err != nil {
panic(perrors.New(fmt.Sprintf("Get tcp port error, err is {%v}", err)))
}
defer tcp.Close()
ports.PushBack(strings.Split(tcp.Addr().String(), ":")[1])
}
return ports
}
func (svcOpts *ServiceOptions) ExportWithoutInfo() error {
return svcOpts.export(nil)
}
func (svcOpts *ServiceOptions) ExportWithInfo(info *ServiceInfo) error {
return svcOpts.export(info)
}
func (svcOpts *ServiceOptions) export(info *ServiceInfo) error {
srv := svcOpts.Service
if info != nil {
srv.Interface = info.InterfaceName
svcOpts.Id = info.InterfaceName
svcOpts.info = info
}
// TODO: delay needExport
if svcOpts.unexported != nil && svcOpts.unexported.Load() {
err := perrors.Errorf("The service %v has already unexported!", srv.Interface)
logger.Errorf(err.Error())
return err
}
if svcOpts.exported != nil && svcOpts.exported.Load() {
logger.Warnf("The service %v has already exported!", srv.Interface)
return nil
}
regUrls := make([]*common.URL, 0)
if !srv.NotRegister {
regUrls = config.LoadRegistries(srv.RegistryIDs, svcOpts.registriesCompat, common.PROVIDER)
}
urlMap := svcOpts.getUrlMap()
protocolConfigs := loadProtocol(srv.ProtocolIDs, svcOpts.protocolsCompat)
if len(protocolConfigs) == 0 {
logger.Warnf("The service %v'svcOpts '%v' protocols don't has right protocolConfigs, Please check your configuration center and transfer protocol ", srv.Interface, srv.ProtocolIDs)
return nil
}
var invoker protocol.Invoker
ports := getRandomPort(protocolConfigs)
nextPort := ports.Front()
proxyFactory := extension.GetProxyFactory(svcOpts.ProxyFactoryKey)
for _, proto := range protocolConfigs {
// *important* Register should have been replaced by processing of ServiceInfo.
// but many modules like metadata need to make use of information from ServiceMap.
// todo(DMwangnimg): finish replacing procedure
// registry the service reflect
methods, err := common.ServiceMap.Register(srv.Interface, proto.Name, srv.Group, srv.Version, svcOpts.rpcService)
if err != nil {
formatErr := perrors.Errorf("The service %v needExport the protocol %v error! Error message is %v.",
srv.Interface, proto.Name, err.Error())
logger.Errorf(formatErr.Error())
return formatErr
}
port := proto.Port
if len(proto.Port) == 0 {
port = nextPort.Value.(string)
nextPort = nextPort.Next()
}
ivkURL := common.NewURLWithOptions(
common.WithPath(srv.Interface),
common.WithProtocol(proto.Name),
common.WithIp(proto.Ip),
common.WithPort(port),
common.WithParams(urlMap),
common.WithParamsValue(constant.BeanNameKey, svcOpts.Id),
//common.WithParamsValue(constant.SslEnabledKey, strconv.FormatBool(config.GetSslEnabled())),
common.WithMethods(strings.Split(methods, ",")),
// todo(DMwangnima): remove this
common.WithAttribute(constant.ServiceInfoKey, info),
common.WithToken(srv.Token),
common.WithParamsValue(constant.MetadataTypeKey, svcOpts.metadataType),
// fix https://github.com/apache/dubbo-go/issues/2176
common.WithParamsValue(constant.MaxServerSendMsgSize, proto.MaxServerSendMsgSize),
common.WithParamsValue(constant.MaxServerRecvMsgSize, proto.MaxServerRecvMsgSize),
)
if len(srv.Tag) > 0 {
ivkURL.AddParam(constant.Tagkey, srv.Tag)
}
// post process the URL to be exported
svcOpts.postProcessConfig(ivkURL)
// config post processor may set "needExport" to false
if !ivkURL.GetParamBool(constant.ExportKey, true) {
return nil
}
if len(regUrls) > 0 {
svcOpts.cacheMutex.Lock()
if svcOpts.cacheProtocol == nil {
logger.Debugf(fmt.Sprintf("First load the registry protocol, url is {%v}!", ivkURL))
svcOpts.cacheProtocol = extension.GetProtocol(constant.RegistryProtocol)
}
svcOpts.cacheMutex.Unlock()
for _, regUrl := range regUrls {
setRegistrySubURL(ivkURL, regUrl)
if info == nil {
invoker = proxyFactory.GetInvoker(regUrl)
} else {
invoker = newInfoInvoker(regUrl, info, svcOpts.rpcService)
}
exporter := svcOpts.cacheProtocol.Export(invoker)
if exporter == nil {
return perrors.New(fmt.Sprintf("Registry protocol new exporter error, registry is {%v}, url is {%v}", regUrl, ivkURL))
}
svcOpts.exporters = append(svcOpts.exporters, exporter)
}
} else {
if ivkURL.GetParam(constant.InterfaceKey, "") == constant.MetadataServiceName {
ms, err := extension.GetLocalMetadataService("")
if err != nil {
logger.Warnf("needExport org.apache.dubbo.metadata.MetadataService failed beacause of %svcOpts ! pls check if you import _ \"dubbo.apache.org/dubbo-go/v3/metadata/service/local\"", err)
return nil
}
if err := ms.SetMetadataServiceURL(ivkURL); err != nil {
logger.Warnf("SetMetadataServiceURL error = %svcOpts", err)
}
}
if info == nil {
invoker = proxyFactory.GetInvoker(ivkURL)
} else {
invoker = newInfoInvoker(ivkURL, info, svcOpts.rpcService)
}
exporter := extension.GetProtocol(protocolwrapper.FILTER).Export(invoker)
if exporter == nil {
return perrors.New(fmt.Sprintf("Filter protocol without registry new exporter error, url is {%v}", ivkURL))
}
svcOpts.exporters = append(svcOpts.exporters, exporter)
}
publishServiceDefinition(ivkURL)
// this protocol would be destroyed in graceful_shutdown
// please refer to (https://github.com/apache/dubbo-go/issues/2429)
graceful_shutdown.RegisterProtocol(proto.Name)
}
svcOpts.exported.Store(true)
return nil
}
// setRegistrySubURL set registry sub url is ivkURl
func setRegistrySubURL(ivkURL *common.URL, regUrl *common.URL) {
ivkURL.AddParam(constant.RegistryKey, regUrl.GetParam(constant.RegistryKey, ""))
regUrl.SubURL = ivkURL
}
// loadProtocol filter protocols by ids
func loadProtocol(protocolIds []string, protocols map[string]*config.ProtocolConfig) []*config.ProtocolConfig {
returnProtocols := make([]*config.ProtocolConfig, 0, len(protocols))
for _, v := range protocolIds {
for k, config := range protocols {
if v == k {
returnProtocols = append(returnProtocols, config)
}
}
}
return returnProtocols
}
// Unexport will call unexport of all exporters service config exported
func (svcOpts *ServiceOptions) Unexport() {
if !svcOpts.exported.Load() {
return
}
if svcOpts.unexported.Load() {
return
}
func() {
svcOpts.exportersLock.Lock()
defer svcOpts.exportersLock.Unlock()
for _, exporter := range svcOpts.exporters {
exporter.UnExport()
}
svcOpts.exporters = nil
}()
svcOpts.exported.Store(false)
svcOpts.unexported.Store(true)
}
// Implement only store the @s and return
func (svcOpts *ServiceOptions) Implement(rpcService common.RPCService) {
svcOpts.rpcService = rpcService
}
func (svcOpts *ServiceOptions) getUrlMap() url.Values {
srv := svcOpts.Service
app := svcOpts.applicationCompat
urlMap := url.Values{}
// first set user params
for k, v := range srv.Params {
urlMap.Set(k, v)
}
urlMap.Set(constant.InterfaceKey, srv.Interface)
urlMap.Set(constant.TimestampKey, strconv.FormatInt(time.Now().Unix(), 10))
urlMap.Set(constant.ClusterKey, srv.Cluster)
urlMap.Set(constant.LoadbalanceKey, srv.Loadbalance)
urlMap.Set(constant.WarmupKey, srv.Warmup)
urlMap.Set(constant.RetriesKey, srv.Retries)
if srv.Group != "" {
urlMap.Set(constant.GroupKey, srv.Group)
}
if srv.Version != "" {
urlMap.Set(constant.VersionKey, srv.Version)
}
urlMap.Set(constant.RegistryRoleKey, strconv.Itoa(common.PROVIDER))
urlMap.Set(constant.ReleaseKey, "dubbo-golang-"+constant.Version)
urlMap.Set(constant.SideKey, (common.RoleType(common.PROVIDER)).Role())
// todo: move
urlMap.Set(constant.SerializationKey, srv.Serialization)
// application config info
urlMap.Set(constant.ApplicationKey, app.Name)
urlMap.Set(constant.OrganizationKey, app.Organization)
urlMap.Set(constant.NameKey, app.Name)
urlMap.Set(constant.ModuleKey, app.Module)
urlMap.Set(constant.AppVersionKey, app.Version)
urlMap.Set(constant.OwnerKey, app.Owner)
urlMap.Set(constant.EnvironmentKey, app.Environment)
//filter
var filters string
if srv.Filter == "" {
filters = constant.DefaultServiceFilters
} else {
filters = srv.Filter
}
if svcOpts.adaptiveService {
filters += fmt.Sprintf(",%svcOpts", constant.AdaptiveServiceProviderFilterKey)
}
urlMap.Set(constant.ServiceFilterKey, filters)
// filter special config
urlMap.Set(constant.AccessLogFilterKey, srv.AccessLog)
// tps limiter
urlMap.Set(constant.TPSLimitStrategyKey, srv.TpsLimitStrategy)
urlMap.Set(constant.TPSLimitIntervalKey, srv.TpsLimitInterval)
urlMap.Set(constant.TPSLimitRateKey, srv.TpsLimitRate)
urlMap.Set(constant.TPSLimiterKey, srv.TpsLimiter)
urlMap.Set(constant.TPSRejectedExecutionHandlerKey, srv.TpsLimitRejectedHandler)
urlMap.Set(constant.TracingConfigKey, srv.TracingKey)
// execute limit filter
urlMap.Set(constant.ExecuteLimitKey, srv.ExecuteLimit)
urlMap.Set(constant.ExecuteRejectedExecutionHandlerKey, srv.ExecuteLimitRejectedHandler)
// auth filter
urlMap.Set(constant.ServiceAuthKey, srv.Auth)
urlMap.Set(constant.ParameterSignatureEnableKey, srv.ParamSign)
// whether to needExport or not
urlMap.Set(constant.ExportKey, strconv.FormatBool(svcOpts.needExport))
urlMap.Set(constant.PIDKey, fmt.Sprintf("%d", os.Getpid()))
for _, v := range srv.Methods {
prefix := "methods." + v.Name + "."
urlMap.Set(prefix+constant.LoadbalanceKey, v.LoadBalance)
urlMap.Set(prefix+constant.RetriesKey, v.Retries)
urlMap.Set(prefix+constant.WeightKey, strconv.FormatInt(v.Weight, 10))
urlMap.Set(prefix+constant.TPSLimitStrategyKey, v.TpsLimitStrategy)
urlMap.Set(prefix+constant.TPSLimitIntervalKey, v.TpsLimitInterval)
urlMap.Set(prefix+constant.TPSLimitRateKey, v.TpsLimitRate)
urlMap.Set(constant.ExecuteLimitKey, v.ExecuteLimit)
urlMap.Set(constant.ExecuteRejectedExecutionHandlerKey, v.ExecuteLimitRejectedHandler)
}
return urlMap
}
// GetExportedUrls will return the url in service config's exporter
func (svcOpts *ServiceOptions) GetExportedUrls() []*common.URL {
if svcOpts.exported.Load() {
var urls []*common.URL
for _, exporter := range svcOpts.exporters {
urls = append(urls, exporter.GetInvoker().GetURL())
}
return urls
}
return nil
}
// postProcessConfig asks registered ConfigPostProcessor to post-process the current ServiceConfig.
func (svcOpts *ServiceOptions) postProcessConfig(url *common.URL) {
for _, p := range extension.GetConfigPostProcessors() {
p.PostProcessServiceConfig(url)
}
}
func publishServiceDefinition(url *common.URL) {
localService, err := extension.GetLocalMetadataService(constant.DefaultKey)
if err != nil {
logger.Warnf("get local metadata service failed, please check if you have imported _ \"dubbo.apache.org/dubbo-go/v3/metadata/service/local\"")
return
}
localService.PublishServiceDefinition(url)
if url.GetParam(constant.MetadataTypeKey, "") != constant.RemoteMetadataStorageType {
return
}
if remoteMetadataService, err := extension.GetRemoteMetadataService(); err == nil && remoteMetadataService != nil {
remoteMetadataService.PublishServiceDefinition(url)
}
}
// todo(DMwangnima): think about moving this function to a common place(e.g. /common/config)
func getRegistryIds(registries map[string]*global.RegistryConfig) []string {
ids := make([]string, 0)
for key := range registries {
ids = append(ids, key)
}
return removeDuplicateElement(ids)
}
// removeDuplicateElement remove duplicate element
func removeDuplicateElement(items []string) []string {
result := make([]string, 0, len(items))
temp := map[string]struct{}{}
for _, item := range items {
if _, ok := temp[item]; !ok && item != "" {
temp[item] = struct{}{}
result = append(result, item)
}
}
return result
}