blob: 51a38a0f16879c4821e711621a03d6e363371521 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package server
import (
"strconv"
"sync"
"time"
)
import (
"github.com/creasty/defaults"
"github.com/dubbogo/gost/log/logger"
perrors "github.com/pkg/errors"
"go.uber.org/atomic"
)
import (
"dubbo.apache.org/dubbo-go/v3/common"
commonCfg "dubbo.apache.org/dubbo-go/v3/common/config"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/config"
aslimiter "dubbo.apache.org/dubbo-go/v3/filter/adaptivesvc/limiter"
"dubbo.apache.org/dubbo-go/v3/global"
"dubbo.apache.org/dubbo-go/v3/graceful_shutdown"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/registry"
)
type ServerOptions struct {
Provider *global.ProviderConfig
Application *global.ApplicationConfig
Registries map[string]*global.RegistryConfig
Protocols map[string]*global.ProtocolConfig
Shutdown *global.ShutdownConfig
providerCompat *config.ProviderConfig
}
func defaultServerOptions() *ServerOptions {
return &ServerOptions{
Application: global.DefaultApplicationConfig(),
Provider: global.DefaultProviderConfig(),
Shutdown: global.DefaultShutdownConfig(),
}
}
// todo(DMwangnima): think about the timing to initialize Registry, Protocol, Tracing
func (srvOpts *ServerOptions) init(opts ...ServerOption) error {
for _, opt := range opts {
opt(srvOpts)
}
if err := defaults.Set(srvOpts); err != nil {
return err
}
prov := srvOpts.Provider
prov.RegistryIDs = commonCfg.TranslateIds(prov.RegistryIDs)
if len(prov.RegistryIDs) <= 0 {
prov.RegistryIDs = getRegistryIds(srvOpts.Registries)
}
prov.ProtocolIDs = commonCfg.TranslateIds(prov.ProtocolIDs)
if err := commonCfg.Verify(prov); err != nil {
return err
}
// enable adaptive service verbose
if prov.AdaptiveServiceVerbose {
if !prov.AdaptiveService {
return perrors.Errorf("The adaptive service is disabled, " +
"adaptive service verbose should be disabled either.")
}
logger.Infof("adaptive service verbose is enabled.")
logger.Debugf("debug-level info could be shown.")
aslimiter.Verbose = true
}
// init graceful_shutdown
graceful_shutdown.Init(graceful_shutdown.SetShutdown_Config(srvOpts.Shutdown))
return nil
}
type ServerOption func(*ServerOptions)
// ---------- For user ----------
// todo(DMwangnima): change Filter Option like Cluster and LoadBalance
func WithServerFilter(filter string) ServerOption {
return func(opts *ServerOptions) {
opts.Provider.Filter = filter
}
}
// todo(DMwangnima): think about a more ideal configuration style
func WithServerRegistryIDs(registryIDs []string) ServerOption {
return func(opts *ServerOptions) {
opts.Provider.RegistryIDs = registryIDs
}
}
func WithServerRegistry(opts ...registry.Option) ServerOption {
regOpts := registry.NewOptions(opts...)
return func(srvOpts *ServerOptions) {
if srvOpts.Registries == nil {
srvOpts.Registries = make(map[string]*global.RegistryConfig)
}
srvOpts.Registries[regOpts.ID] = regOpts.Registry
}
}
// todo(DMwangnima): think about a more ideal configuration style
func WithServerProtocolIDs(protocolIDs []string) ServerOption {
return func(opts *ServerOptions) {
opts.Provider.ProtocolIDs = protocolIDs
}
}
func WithServerProtocol(opts ...protocol.Option) ServerOption {
proOpts := protocol.NewOptions(opts...)
return func(srvOpts *ServerOptions) {
if srvOpts.Protocols == nil {
srvOpts.Protocols = make(map[string]*global.ProtocolConfig)
}
srvOpts.Protocols[proOpts.ID] = proOpts.Protocol
}
}
// todo(DMwangnima): this configuration would be used by filter/hystrix
// think about a more ideal way to configure
func WithServerFilterConf(conf interface{}) ServerOption {
return func(opts *ServerOptions) {
opts.Provider.FilterConf = conf
}
}
func WithServerAdaptiveService() ServerOption {
return func(opts *ServerOptions) {
opts.Provider.AdaptiveService = true
}
}
func WithServerAdaptiveServiceVerbose() ServerOption {
return func(opts *ServerOptions) {
opts.Provider.AdaptiveServiceVerbose = true
}
}
// ========== For framework ==========
// These functions should not be invoked by users
func SetServer_Application(application *global.ApplicationConfig) ServerOption {
return func(opts *ServerOptions) {
opts.Application = application
}
}
func SetServer_Registries(regs map[string]*global.RegistryConfig) ServerOption {
return func(opts *ServerOptions) {
opts.Registries = regs
}
}
func SetServer_Protocols(pros map[string]*global.ProtocolConfig) ServerOption {
return func(opts *ServerOptions) {
opts.Protocols = pros
}
}
func SetServer_Shutdown(shutdown *global.ShutdownConfig) ServerOption {
return func(opts *ServerOptions) {
opts.Shutdown = shutdown
}
}
type ServiceOptions struct {
Application *global.ApplicationConfig
Provider *global.ProviderConfig
Service *global.ServiceConfig
Registries map[string]*global.RegistryConfig
Protocols map[string]*global.ProtocolConfig
Id string
unexported *atomic.Bool
exported *atomic.Bool
needExport bool
metadataType string
info *ServiceInfo
ProxyFactoryKey string
rpcService common.RPCService
cacheMutex sync.Mutex
cacheProtocol protocol.Protocol
exportersLock sync.Mutex
exporters []protocol.Exporter
adaptiveService bool
methodsCompat []*config.MethodConfig
applicationCompat *config.ApplicationConfig
registriesCompat map[string]*config.RegistryConfig
protocolsCompat map[string]*config.ProtocolConfig
}
func defaultServiceOptions() *ServiceOptions {
return &ServiceOptions{
Service: global.DefaultServiceConfig(),
Application: global.DefaultApplicationConfig(),
unexported: atomic.NewBool(false),
exported: atomic.NewBool(false),
needExport: true,
}
}
func (svcOpts *ServiceOptions) init(opts ...ServiceOption) error {
for _, opt := range opts {
opt(svcOpts)
}
if err := defaults.Set(svcOpts); err != nil {
return err
}
srv := svcOpts.Service
svcOpts.exported = atomic.NewBool(false)
application := svcOpts.Application
if application != nil {
svcOpts.applicationCompat = compatApplicationConfig(application)
if err := svcOpts.applicationCompat.Init(); err != nil {
return err
}
// todo(DMwangnima): make this clearer
// this statement is responsible for setting rootConfig.Application
// since many modules would retrieve this information directly.
config.GetRootConfig().Application = svcOpts.applicationCompat
svcOpts.metadataType = svcOpts.applicationCompat.MetadataType
if srv.Group == "" {
srv.Group = svcOpts.applicationCompat.Group
}
if srv.Version == "" {
srv.Version = svcOpts.applicationCompat.Version
}
}
svcOpts.unexported = atomic.NewBool(false)
// initialize Registries
if len(srv.RCRegistriesMap) == 0 {
srv.RCRegistriesMap = svcOpts.Registries
}
if len(srv.RCRegistriesMap) > 0 {
svcOpts.registriesCompat = make(map[string]*config.RegistryConfig)
for key, reg := range srv.RCRegistriesMap {
svcOpts.registriesCompat[key] = compatRegistryConfig(reg)
if err := svcOpts.registriesCompat[key].Init(); err != nil {
return err
}
}
}
// initialize Protocols
if len(srv.RCProtocolsMap) == 0 {
srv.RCProtocolsMap = svcOpts.Protocols
}
if len(srv.RCProtocolsMap) > 0 {
svcOpts.protocolsCompat = make(map[string]*config.ProtocolConfig)
for key, pro := range srv.RCProtocolsMap {
svcOpts.protocolsCompat[key] = compatProtocolConfig(pro)
if err := svcOpts.protocolsCompat[key].Init(); err != nil {
return err
}
}
}
srv.RegistryIDs = commonCfg.TranslateIds(srv.RegistryIDs)
if len(srv.RegistryIDs) <= 0 {
srv.RegistryIDs = svcOpts.Provider.RegistryIDs
}
if srv.RegistryIDs == nil || len(srv.RegistryIDs) <= 0 {
srv.NotRegister = true
}
srv.ProtocolIDs = commonCfg.TranslateIds(srv.ProtocolIDs)
if len(srv.ProtocolIDs) <= 0 {
srv.ProtocolIDs = svcOpts.Provider.ProtocolIDs
}
if len(srv.ProtocolIDs) <= 0 {
for name := range svcOpts.Protocols {
srv.ProtocolIDs = append(srv.ProtocolIDs, name)
}
}
if srv.TracingKey == "" {
srv.TracingKey = svcOpts.Provider.TracingKey
}
err := svcOpts.check()
if err != nil {
panic(err)
}
svcOpts.needExport = true
return commonCfg.Verify(svcOpts)
}
type ServiceOption func(*ServiceOptions)
// ---------- For user ----------
// todo(DMwangnima): think about a more ideal configuration style
func WithRegistryIDs(registryIDs []string) ServiceOption {
return func(cfg *ServiceOptions) {
if len(registryIDs) <= 0 {
cfg.Service.RegistryIDs = registryIDs
}
}
}
// todo(DMwangnima): change Filter Option like Cluster and LoadBalance
func WithFilter(filter string) ServiceOption {
return func(cfg *ServiceOptions) {
cfg.Service.Filter = filter
}
}
// todo(DMwangnima): think about a more ideal configuration style
func WithProtocolIDs(protocolIDs []string) ServiceOption {
return func(cfg *ServiceOptions) {
if len(protocolIDs) <= 0 {
cfg.Service.ProtocolIDs = protocolIDs
}
}
}
// ========== LoadBalance Strategy ==========
func WithLoadBalanceConsistentHashing() ServiceOption {
return func(opts *ServiceOptions) {
opts.Service.Loadbalance = constant.LoadBalanceKeyConsistentHashing
}
}
func WithLoadBalanceLeastActive() ServiceOption {
return func(opts *ServiceOptions) {
opts.Service.Loadbalance = constant.LoadBalanceKeyLeastActive
}
}
func WithLoadBalanceRandom() ServiceOption {
return func(opts *ServiceOptions) {
opts.Service.Loadbalance = constant.LoadBalanceKeyRandom
}
}
func WithLoadBalanceRoundRobin() ServiceOption {
return func(opts *ServiceOptions) {
opts.Service.Loadbalance = constant.LoadBalanceKeyRoundRobin
}
}
func WithLoadBalanceP2C() ServiceOption {
return func(opts *ServiceOptions) {
opts.Service.Loadbalance = constant.LoadBalanceKeyP2C
}
}
func WithLoadBalanceXDSRingHash() ServiceOption {
return func(opts *ServiceOptions) {
opts.Service.Loadbalance = constant.LoadBalanceKeyLeastActive
}
}
// warmUp is in seconds
func WithWarmUp(warmUp time.Duration) ServiceOption {
return func(opts *ServiceOptions) {
warmUpSec := int(warmUp / time.Second)
opts.Service.Warmup = strconv.Itoa(warmUpSec)
}
}
// ========== Cluster Strategy ==========
func WithClusterAvailable() ServiceOption {
return func(opts *ServiceOptions) {
opts.Service.Cluster = constant.ClusterKeyAvailable
}
}
func WithClusterBroadcast() ServiceOption {
return func(opts *ServiceOptions) {
opts.Service.Cluster = constant.ClusterKeyBroadcast
}
}
func WithClusterFailBack() ServiceOption {
return func(opts *ServiceOptions) {
opts.Service.Cluster = constant.ClusterKeyFailback
}
}
func WithClusterFailFast() ServiceOption {
return func(opts *ServiceOptions) {
opts.Service.Cluster = constant.ClusterKeyFailfast
}
}
func WithClusterFailOver() ServiceOption {
return func(opts *ServiceOptions) {
opts.Service.Cluster = constant.ClusterKeyFailover
}
}
func WithClusterFailSafe() ServiceOption {
return func(opts *ServiceOptions) {
opts.Service.Cluster = constant.ClusterKeyFailsafe
}
}
func WithClusterForking() ServiceOption {
return func(opts *ServiceOptions) {
opts.Service.Cluster = constant.ClusterKeyForking
}
}
func WithClusterZoneAware() ServiceOption {
return func(opts *ServiceOptions) {
opts.Service.Cluster = constant.ClusterKeyZoneAware
}
}
func WithClusterAdaptiveService() ServiceOption {
return func(opts *ServiceOptions) {
opts.Service.Cluster = constant.ClusterKeyAdaptiveService
}
}
func WithGroup(group string) ServiceOption {
return func(cfg *ServiceOptions) {
cfg.Service.Group = group
}
}
func WithVersion(version string) ServiceOption {
return func(cfg *ServiceOptions) {
cfg.Service.Version = version
}
}
func WithJSON() ServiceOption {
return func(opts *ServiceOptions) {
opts.Service.Serialization = constant.JSONSerialization
}
}
// WithToken should be used with WithFilter("token")
func WithToken(token string) ServiceOption {
return func(opts *ServiceOptions) {
opts.Service.Token = token
}
}
func WithNotRegister() ServiceOption {
return func(cfg *ServiceOptions) {
cfg.Service.NotRegister = true
}
}
// ----------For framework----------
// These functions should not be invoked by users
func SetApplication(application *global.ApplicationConfig) ServiceOption {
return func(opts *ServiceOptions) {
opts.Application = application
}
}
func SetProvider(provider *global.ProviderConfig) ServiceOption {
return func(opts *ServiceOptions) {
opts.Provider = provider
}
}
func SetService(service *global.ServiceConfig) ServiceOption {
return func(opts *ServiceOptions) {
opts.Service = service
}
}
func SetRegistries(regs map[string]*global.RegistryConfig) ServiceOption {
return func(opts *ServiceOptions) {
opts.Registries = regs
}
}
func SetProtocols(pros map[string]*global.ProtocolConfig) ServiceOption {
return func(opts *ServiceOptions) {
opts.Protocols = pros
}
}