feat: finish server layer (#2425)
diff --git a/client/action.go b/client/action.go
index d20fd9d..58893e5 100644
--- a/client/action.go
+++ b/client/action.go
@@ -39,6 +39,7 @@
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/config"
+ "dubbo.apache.org/dubbo-go/v3/graceful_shutdown"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/protocolwrapper"
"dubbo.apache.org/dubbo-go/v3/proxy"
@@ -252,6 +253,9 @@
}
opts.pxy.Implement(srv)
}
+ // this protocol would be destroyed in graceful_shutdown
+ // please refer to (https://github.com/apache/dubbo-go/issues/2429)
+ graceful_shutdown.RegisterProtocol(ref.Protocol)
}
func (opts *ClientOptions) CheckAvailable() bool {
diff --git a/client/options.go b/client/options.go
index 3f6169f..c5e1760 100644
--- a/client/options.go
+++ b/client/options.go
@@ -19,6 +19,7 @@
import (
"strconv"
+ "time"
)
import (
@@ -28,10 +29,13 @@
import (
"dubbo.apache.org/dubbo-go/v3/common"
commonCfg "dubbo.apache.org/dubbo-go/v3/common/config"
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/global"
+ "dubbo.apache.org/dubbo-go/v3/graceful_shutdown"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/proxy"
+ "dubbo.apache.org/dubbo-go/v3/registry"
)
type ClientOptions struct {
@@ -39,6 +43,7 @@
Consumer *global.ConsumerConfig
Reference *global.ReferenceConfig
Registries map[string]*global.RegistryConfig
+ Shutdown *global.ShutdownConfig
pxy *proxy.Proxy
id string
@@ -54,7 +59,10 @@
func defaultClientOptions() *ClientOptions {
return &ClientOptions{
- Reference: global.DefaultReferenceConfig(),
+ Application: global.DefaultApplicationConfig(),
+ Consumer: global.DefaultConsumerConfig(),
+ Reference: global.DefaultReferenceConfig(),
+ Shutdown: global.DefaultShutdownConfig(),
}
}
@@ -103,6 +111,10 @@
// todo(DMwangnima): move to registry package
// init registries
+ var emptyRegIDsFlag bool
+ if ref.RegistryIDs == nil || len(ref.RegistryIDs) <= 0 {
+ emptyRegIDsFlag = true
+ }
regs := cliOpts.Registries
if regs != nil {
cliOpts.registriesCompat = make(map[string]*config.RegistryConfig)
@@ -111,10 +123,16 @@
if err := cliOpts.registriesCompat[key].Init(); err != nil {
return err
}
+ if emptyRegIDsFlag {
+ ref.RegistryIDs = append(ref.RegistryIDs, key)
+ }
}
}
ref.RegistryIDs = commonCfg.TranslateIds(ref.RegistryIDs)
+ // init graceful_shutdown
+ graceful_shutdown.Init(graceful_shutdown.WithShutdown_Config(cliOpts.Shutdown))
+
return commonCfg.Verify(cliOpts)
}
@@ -122,8 +140,9 @@
// ---------- For user ----------
-func WithCheck(check bool) ClientOption {
+func WithCheck() ClientOption {
return func(opts *ClientOptions) {
+ check := true
opts.Reference.Check = &check
}
}
@@ -134,18 +153,14 @@
}
}
+// todo(DMwangnima): change Filter Option like Cluster and LoadBalance
func WithFilter(filter string) ClientOption {
return func(opts *ClientOptions) {
opts.Reference.Filter = filter
}
}
-func WithProtocol(protocol string) ClientOption {
- return func(opts *ClientOptions) {
- opts.Reference.Protocol = protocol
- }
-}
-
+// todo(DMwangnima): think about a more ideal configuration style
func WithRegistryIDs(registryIDs []string) ClientOption {
return func(opts *ClientOptions) {
if len(registryIDs) > 0 {
@@ -154,15 +169,122 @@
}
}
-func WithCluster(cluster string) ClientOption {
- return func(opts *ClientOptions) {
- opts.Reference.Cluster = cluster
+func WithRegistry(key string, opts ...registry.Option) ClientOption {
+ regOpts := registry.DefaultOptions()
+ for _, opt := range opts {
+ opt(regOpts)
+ }
+
+ return func(cliOpts *ClientOptions) {
+ if cliOpts.Registries == nil {
+ cliOpts.Registries = make(map[string]*global.RegistryConfig)
+ }
+ cliOpts.Registries[key] = regOpts.Registry
}
}
-func WithLoadBalance(loadBalance string) ClientOption {
+func WithShutdown(opts ...graceful_shutdown.Option) ClientOption {
+ sdOpts := graceful_shutdown.DefaultOptions()
+ for _, opt := range opts {
+ opt(sdOpts)
+ }
+
+ return func(cliOpts *ClientOptions) {
+ cliOpts.Shutdown = sdOpts.Shutdown
+ }
+}
+
+// ========== Cluster Strategy ==========
+
+func WithClusterAvailable() ClientOption {
return func(opts *ClientOptions) {
- opts.Reference.Loadbalance = loadBalance
+ opts.Reference.Cluster = constant.ClusterKeyAvailable
+ }
+}
+
+func WithClusterBroadcast() ClientOption {
+ return func(opts *ClientOptions) {
+ opts.Reference.Cluster = constant.ClusterKeyBroadcast
+ }
+}
+
+func WithClusterFailBack() ClientOption {
+ return func(opts *ClientOptions) {
+ opts.Reference.Cluster = constant.ClusterKeyFailback
+ }
+}
+
+func WithClusterFailFast() ClientOption {
+ return func(opts *ClientOptions) {
+ opts.Reference.Cluster = constant.ClusterKeyFailfast
+ }
+}
+
+func WithClusterFailOver() ClientOption {
+ return func(opts *ClientOptions) {
+ opts.Reference.Cluster = constant.ClusterKeyFailover
+ }
+}
+
+func WithClusterFailSafe() ClientOption {
+ return func(opts *ClientOptions) {
+ opts.Reference.Cluster = constant.ClusterKeyFailsafe
+ }
+}
+
+func WithClusterForking() ClientOption {
+ return func(opts *ClientOptions) {
+ opts.Reference.Cluster = constant.ClusterKeyForking
+ }
+}
+
+func WithClusterZoneAware() ClientOption {
+ return func(opts *ClientOptions) {
+ opts.Reference.Cluster = constant.ClusterKeyZoneAware
+ }
+}
+
+func WithClusterAdaptiveService() ClientOption {
+ return func(opts *ClientOptions) {
+ opts.Reference.Cluster = constant.ClusterKeyAdaptiveService
+ }
+}
+
+// ========== LoadBalance Strategy ==========
+
+func WithLoadBalanceConsistentHashing() ClientOption {
+ return func(opts *ClientOptions) {
+ opts.Reference.Loadbalance = constant.LoadBalanceKeyConsistentHashing
+ }
+}
+
+func WithLoadBalanceLeastActive() ClientOption {
+ return func(opts *ClientOptions) {
+ opts.Reference.Loadbalance = constant.LoadBalanceKeyLeastActive
+ }
+}
+
+func WithLoadBalanceRandom() ClientOption {
+ return func(opts *ClientOptions) {
+ opts.Reference.Loadbalance = constant.LoadBalanceKeyRandom
+ }
+}
+
+func WithLoadBalanceRoundRobin() ClientOption {
+ return func(opts *ClientOptions) {
+ opts.Reference.Loadbalance = constant.LoadBalanceKeyRoundRobin
+ }
+}
+
+func WithLoadBalanceP2C() ClientOption {
+ return func(opts *ClientOptions) {
+ opts.Reference.Loadbalance = constant.LoadBalanceKeyP2C
+ }
+}
+
+func WithLoadBalanceXDSRingHash() ClientOption {
+ return func(opts *ClientOptions) {
+ opts.Reference.Loadbalance = constant.LoadBalanceKeyLeastActive
}
}
@@ -184,23 +306,24 @@
}
}
-func WithSerialization(serialization string) ClientOption {
+func WithJSON() ClientOption {
return func(opts *ClientOptions) {
- opts.Reference.Serialization = serialization
+ opts.Reference.Serialization = constant.JSONSerialization
}
}
-func WithProviderBy(providedBy string) ClientOption {
+func WithProvidedBy(providedBy string) ClientOption {
return func(opts *ClientOptions) {
opts.Reference.ProvidedBy = providedBy
}
}
-func WithAsync(async bool) ClientOption {
- return func(opts *ClientOptions) {
- opts.Reference.Async = async
- }
-}
+// todo(DMwangnima): implement this functionality
+//func WithAsync() ClientOption {
+// return func(opts *ClientOptions) {
+// opts.Reference.Async = true
+// }
+//}
func WithParams(params map[string]string) ClientOption {
return func(opts *ClientOptions) {
@@ -208,11 +331,16 @@
}
}
-func WithGeneric(generic string) ClientOption {
- return func(opts *ClientOptions) {
- opts.Reference.Generic = generic
- }
-}
+// todo(DMwangnima): implement this functionality
+//func WithGeneric(generic bool) ClientOption {
+// return func(opts *ClientOptions) {
+// if generic {
+// opts.Reference.Generic = "true"
+// } else {
+// opts.Reference.Generic = "false"
+// }
+// }
+//}
func WithSticky(sticky bool) ClientOption {
return func(opts *ClientOptions) {
@@ -220,9 +348,9 @@
}
}
-func WithRequestTimeout(timeout string) ClientOption {
+func WithRequestTimeout(timeout time.Duration) ClientOption {
return func(opts *ClientOptions) {
- opts.Reference.RequestTimeout = timeout
+ opts.Reference.RequestTimeout = timeout.String()
}
}
@@ -245,51 +373,35 @@
}
// ---------- For framework ----------
+// These functions should not be invoked by users
-func WithRegistryConfig(key string, opts ...global.RegistryOption) ClientOption {
- regCfg := new(global.RegistryConfig)
- for _, opt := range opts {
- opt(regCfg)
- }
-
+func SetRegistries(regs map[string]*global.RegistryConfig) ClientOption {
return func(opts *ClientOptions) {
- if opts.Registries == nil {
- opts.Registries = make(map[string]*global.RegistryConfig)
- }
- opts.Registries[key] = regCfg
+ opts.Registries = regs
}
}
-func WithApplicationConfig(opts ...global.ApplicationOption) ClientOption {
- appCfg := new(global.ApplicationConfig)
- for _, opt := range opts {
- opt(appCfg)
- }
-
+func SetApplication(application *global.ApplicationConfig) ClientOption {
return func(opts *ClientOptions) {
- opts.Application = appCfg
+ opts.Application = application
}
}
-func WithConsumerConfig(opts ...global.ConsumerOption) ClientOption {
- conCfg := new(global.ConsumerConfig)
- for _, opt := range opts {
- opt(conCfg)
- }
-
+func SetConsumer(consumer *global.ConsumerConfig) ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer = conCfg
+ opts.Consumer = consumer
}
}
-func WithReferenceConfig(opts ...global.ReferenceOption) ClientOption {
- refCfg := new(global.ReferenceConfig)
- for _, opt := range opts {
- opt(refCfg)
- }
-
+func SetReference(reference *global.ReferenceConfig) ClientOption {
return func(opts *ClientOptions) {
- opts.Reference = refCfg
+ opts.Reference = reference
+ }
+}
+
+func SetShutdown(shutdown *global.ShutdownConfig) ClientOption {
+ return func(opts *ClientOptions) {
+ opts.Shutdown = shutdown
}
}
diff --git a/common/constant/key.go b/common/constant/key.go
index 54c1ca3..2b6f1ca 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -144,6 +144,7 @@
CallHTTPTypeKey = "call-http-type"
CallHTTP = "http"
CallHTTP2 = "http2"
+ ServiceInfoKey = "service-info"
)
const (
diff --git a/common/constant/serialization.go b/common/constant/serialization.go
index 620037d..d1649f5 100644
--- a/common/constant/serialization.go
+++ b/common/constant/serialization.go
@@ -26,4 +26,5 @@
Hessian2Serialization = "hessian2"
ProtobufSerialization = "protobuf"
MsgpackSerialization = "msgpack"
+ JSONSerialization = "json"
)
diff --git a/common/url.go b/common/url.go
index 3bc481b..79917a4 100644
--- a/common/url.go
+++ b/common/url.go
@@ -44,7 +44,6 @@
import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
- "dubbo.apache.org/dubbo-go/v3/server"
)
// dubbo role type constant
@@ -111,11 +110,12 @@
paramsLock sync.RWMutex
params url.Values
- Path string // like /com.ikurento.dubbo.UserProvider
- Username string
- Password string
- Methods []string
- MethodInfo []server.MethodInfo
+ Path string // like /com.ikurento.dubbo.UserProvider
+ Username string
+ Password string
+ Methods []string
+ // Attributes should not be transported
+ Attributes map[string]interface{} `hessian:"-"`
// special for registry
SubURL *URL
}
@@ -220,10 +220,13 @@
}
}
-// WithMethodInfos sets methodInfos for URL
-func WithMethodInfos(methodInfos []server.MethodInfo) Option {
+// WithAttribute sets attribute for URL
+func WithAttribute(key string, attribute interface{}) Option {
return func(url *URL) {
- url.MethodInfo = methodInfos
+ if url.Attributes == nil {
+ url.Attributes = make(map[string]interface{})
+ }
+ url.Attributes[key] = attribute
}
}
@@ -778,7 +781,8 @@
}
// finally execute methodConfigMergeFcn
- for _, method := range referenceURL.Methods {
+ mergedURL.Methods = make([]string, len(referenceURL.Methods))
+ for i, method := range referenceURL.Methods {
for _, paramKey := range []string{constant.LoadbalanceKey, constant.ClusterKey, constant.RetriesKey, constant.TimeoutKey} {
if v := referenceURL.GetParam(paramKey, ""); len(v) > 0 {
params[paramKey] = []string{v}
@@ -790,6 +794,7 @@
params[methodsKey] = []string{v}
}
//}
+ mergedURL.Methods[i] = method
}
}
// In this way, we will raise some performance.
diff --git a/compat.go b/compat.go
index 182ff13..19202bf 100644
--- a/compat.go
+++ b/compat.go
@@ -27,6 +27,9 @@
)
func compatRootConfig(c *InstanceOptions) *config.RootConfig {
+ if c == nil {
+ return nil
+ }
proCompat := make(map[string]*config.ProtocolConfig)
for k, v := range c.Protocols {
proCompat[k] = compatProtocolConfig(v)
@@ -63,6 +66,9 @@
}
func compatApplicationConfig(c *global.ApplicationConfig) *config.ApplicationConfig {
+ if c == nil {
+ return nil
+ }
return &config.ApplicationConfig{
Organization: c.Organization,
Name: c.Name,
@@ -77,6 +83,9 @@
}
func compatProtocolConfig(c *global.ProtocolConfig) *config.ProtocolConfig {
+ if c == nil {
+ return nil
+ }
return &config.ProtocolConfig{
Name: c.Name,
Ip: c.Ip,
@@ -88,6 +97,9 @@
}
func compatRegistryConfig(c *global.RegistryConfig) *config.RegistryConfig {
+ if c == nil {
+ return nil
+ }
return &config.RegistryConfig{
Protocol: c.Protocol,
Timeout: c.Timeout,
@@ -109,6 +121,9 @@
}
func compatCenterConfig(c *global.CenterConfig) *config.CenterConfig {
+ if c == nil {
+ return nil
+ }
return &config.CenterConfig{
Protocol: c.Protocol,
Address: c.Address,
@@ -126,6 +141,9 @@
}
func compatMetadataReportConfig(c *global.MetadataReportConfig) *config.MetadataReportConfig {
+ if c == nil {
+ return nil
+ }
return &config.MetadataReportConfig{
Protocol: c.Protocol,
Address: c.Address,
@@ -138,12 +156,20 @@
}
func compatProviderConfig(c *global.ProviderConfig) *config.ProviderConfig {
+ if c == nil {
+ return nil
+ }
+ services := make(map[string]*config.ServiceConfig)
+ for key, svc := range c.Services {
+ services[key] = compatServiceConfig(svc)
+ }
return &config.ProviderConfig{
Filter: c.Filter,
Register: c.Register,
RegistryIDs: c.RegistryIDs,
ProtocolIDs: c.ProtocolIDs,
TracingKey: c.TracingKey,
+ Services: services,
ProxyFactory: c.ProxyFactory,
FilterConf: c.FilterConf,
ConfigType: c.ConfigType,
@@ -152,7 +178,81 @@
}
}
+func compatServiceConfig(c *global.ServiceConfig) *config.ServiceConfig {
+ if c == nil {
+ return nil
+ }
+ methods := make([]*config.MethodConfig, len(c.Methods))
+ for i, method := range c.Methods {
+ methods[i] = compatMethodConfig(method)
+ }
+ protocols := make(map[string]*config.ProtocolConfig)
+ for key, pro := range c.RCProtocolsMap {
+ protocols[key] = compatProtocolConfig(pro)
+ }
+ registries := make(map[string]*config.RegistryConfig)
+ for key, reg := range c.RCRegistriesMap {
+ registries[key] = compatRegistryConfig(reg)
+ }
+ return &config.ServiceConfig{
+ Filter: c.Filter,
+ ProtocolIDs: c.ProtocolIDs,
+ Interface: c.Interface,
+ RegistryIDs: c.RegistryIDs,
+ Cluster: c.Cluster,
+ Loadbalance: c.Loadbalance,
+ Group: c.Group,
+ Version: c.Version,
+ Methods: methods,
+ Warmup: c.Warmup,
+ Retries: c.Retries,
+ Serialization: c.Serialization,
+ Params: c.Params,
+ Token: c.Token,
+ AccessLog: c.AccessLog,
+ TpsLimiter: c.TpsLimiter,
+ TpsLimitInterval: c.TpsLimitInterval,
+ TpsLimitRate: c.TpsLimitRate,
+ TpsLimitStrategy: c.TpsLimitStrategy,
+ TpsLimitRejectedHandler: c.TpsLimitRejectedHandler,
+ ExecuteLimit: c.ExecuteLimit,
+ ExecuteLimitRejectedHandler: c.ExecuteLimitRejectedHandler,
+ Auth: c.Auth,
+ NotRegister: c.NotRegister,
+ ParamSign: c.ParamSign,
+ Tag: c.Tag,
+ TracingKey: c.TracingKey,
+ RCProtocolsMap: protocols,
+ RCRegistriesMap: registries,
+ ProxyFactoryKey: c.ProxyFactoryKey,
+ }
+}
+
+func compatMethodConfig(c *global.MethodConfig) *config.MethodConfig {
+ if c == nil {
+ return nil
+ }
+ return &config.MethodConfig{
+ InterfaceId: c.InterfaceId,
+ InterfaceName: c.InterfaceName,
+ Name: c.Name,
+ Retries: c.Retries,
+ LoadBalance: c.LoadBalance,
+ Weight: c.Weight,
+ TpsLimitInterval: c.TpsLimitInterval,
+ TpsLimitRate: c.TpsLimitRate,
+ TpsLimitStrategy: c.TpsLimitStrategy,
+ ExecuteLimit: c.ExecuteLimit,
+ ExecuteLimitRejectedHandler: c.ExecuteLimitRejectedHandler,
+ Sticky: c.Sticky,
+ RequestTimeout: c.RequestTimeout,
+ }
+}
+
func compatConsumerConfig(c *global.ConsumerConfig) *config.ConsumerConfig {
+ if c == nil {
+ return nil
+ }
return &config.ConsumerConfig{
Filter: c.Filter,
RegistryIDs: c.RegistryIDs,
@@ -169,6 +269,9 @@
}
func compatMetricConfig(c *global.MetricConfig) *config.MetricConfig {
+ if c == nil {
+ return nil
+ }
return &config.MetricConfig{
Mode: c.Mode,
Namespace: c.Namespace,
@@ -182,6 +285,9 @@
}
func compatTracingConfig(c *global.TracingConfig) *config.TracingConfig {
+ if c == nil {
+ return nil
+ }
return &config.TracingConfig{
Name: c.Name,
ServiceName: c.ServiceName,
@@ -191,6 +297,9 @@
}
func compatLoggerConfig(c *global.LoggerConfig) *config.LoggerConfig {
+ if c == nil {
+ return nil
+ }
return &config.LoggerConfig{
Driver: c.Driver,
Level: c.Level,
@@ -201,6 +310,9 @@
}
func compatFile(c *global.File) *config.File {
+ if c == nil {
+ return nil
+ }
return &config.File{
Name: c.Name,
MaxSize: c.MaxSize,
@@ -211,6 +323,9 @@
}
func compatShutdownConfig(c *global.ShutdownConfig) *config.ShutdownConfig {
+ if c == nil {
+ return nil
+ }
cfg := &config.ShutdownConfig{
Timeout: c.Timeout,
StepTimeout: c.StepTimeout,
@@ -226,18 +341,27 @@
}
func compatCustomConfig(c *global.CustomConfig) *config.CustomConfig {
+ if c == nil {
+ return nil
+ }
return &config.CustomConfig{
ConfigMap: c.ConfigMap,
}
}
func compatProfilesConfig(c *global.ProfilesConfig) *config.ProfilesConfig {
+ if c == nil {
+ return nil
+ }
return &config.ProfilesConfig{
Active: c.Active,
}
}
func compatTLSConfig(c *global.TLSConfig) *config.TLSConfig {
+ if c == nil {
+ return nil
+ }
return &config.TLSConfig{
CACertFile: c.CACertFile,
TLSCertFile: c.TLSCertFile,
diff --git a/dubbo.go b/dubbo.go
index 18629b4..6513653 100644
--- a/dubbo.go
+++ b/dubbo.go
@@ -24,6 +24,7 @@
import (
"dubbo.apache.org/dubbo-go/v3/client"
"dubbo.apache.org/dubbo-go/v3/global"
+ "dubbo.apache.org/dubbo-go/v3/server"
)
// Instance is the highest layer conception that user could touch. It is mapped from RootConfig.
@@ -58,62 +59,30 @@
conCfg := ins.insOpts.Consumer
appCfg := ins.insOpts.Application
regsCfg := ins.insOpts.Registries
- // todo(DMwangnima): use slices to maintain options
+ sdCfg := ins.insOpts.Shutdown
if conCfg != nil {
+ refCfg := &global.ReferenceConfig{
+ Check: &conCfg.Check,
+ Filter: conCfg.Filter,
+ Protocol: conCfg.Protocol,
+ RegistryIDs: conCfg.RegistryIDs,
+ TracingKey: conCfg.TracingKey,
+ }
// these options come from Consumer and Root.
// for dubbo-go developers, referring config/ConsumerConfig.Init and config/ReferenceConfig
cliOpts = append(cliOpts,
- client.WithReferenceConfig(
- global.WithReference_Filter(conCfg.Filter),
- global.WithReference_RegistryIDs(conCfg.RegistryIDs),
- global.WithReference_Protocol(conCfg.Protocol),
- global.WithReference_TracingKey(conCfg.TracingKey),
- global.WithReference_Check(conCfg.Check),
- ),
- client.WithConsumerConfig(
- global.WithConsumer_MeshEnabled(conCfg.MeshEnabled),
- global.WithConsumer_AdaptiveService(conCfg.AdaptiveService),
- global.WithConsumer_ProxyFactory(conCfg.ProxyFactory),
- ),
+ client.SetReference(refCfg),
+ client.SetConsumer(conCfg),
)
}
if appCfg != nil {
- cliOpts = append(cliOpts,
- client.WithApplicationConfig(
- global.WithApplication_Name(appCfg.Name),
- global.WithApplication_Organization(appCfg.Organization),
- global.WithApplication_Module(appCfg.Module),
- global.WithApplication_Version(appCfg.Version),
- global.WithApplication_Owner(appCfg.Owner),
- global.WithApplication_Environment(appCfg.Environment),
- global.WithApplication_Group(appCfg.Group),
- global.WithApplication_MetadataType(appCfg.MetadataType),
- ),
- )
+ cliOpts = append(cliOpts, client.SetApplication(appCfg))
}
if regsCfg != nil {
- for key, reg := range regsCfg {
- cliOpts = append(cliOpts,
- client.WithRegistryConfig(key,
- global.WithRegistry_Protocol(reg.Protocol),
- global.WithRegistry_Timeout(reg.Timeout),
- global.WithRegistry_Group(reg.Group),
- global.WithRegistry_Namespace(reg.Namespace),
- global.WithRegistry_TTL(reg.TTL),
- global.WithRegistry_Address(reg.Address),
- global.WithRegistry_Username(reg.Username),
- global.WithRegistry_Password(reg.Password),
- global.WithRegistry_Simplified(reg.Simplified),
- global.WithRegistry_Preferred(reg.Preferred),
- global.WithRegistry_Zone(reg.Zone),
- global.WithRegistry_Weight(reg.Weight),
- global.WithRegistry_Params(reg.Params),
- global.WithRegistry_RegistryType(reg.RegistryType),
- global.WithRegistry_UseAsMetaReport(reg.UseAsMetaReport),
- global.WithRegistry_UseAsConfigCenter(reg.UseAsConfigCenter),
- ),
- )
- }
+ cliOpts = append(cliOpts, client.SetRegistries(regsCfg))
+ }
+ if sdCfg != nil {
+ cliOpts = append(cliOpts, client.SetShutdown(sdCfg))
}
// options passed by users has higher priority
cliOpts = append(cliOpts, opts...)
@@ -125,3 +94,52 @@
return cli, nil
}
+
+// NewServer is like server.NewServer, but inject configurations from RootConfig.
+func (ins *Instance) NewServer(opts ...server.ServerOption) (*server.Server, error) {
+ if ins == nil || ins.insOpts == nil {
+ return nil, errors.New("Instance has not been initialized")
+ }
+
+ var srvOpts []server.ServerOption
+ appCfg := ins.insOpts.Application
+ regsCfg := ins.insOpts.Registries
+ prosCfg := ins.insOpts.Protocols
+ trasCfg := ins.insOpts.Tracing
+ sdCfg := ins.insOpts.Shutdown
+ if appCfg != nil {
+ srvOpts = append(srvOpts,
+ server.SetServer_Application(appCfg),
+ //server.WithServer_ApplicationConfig(
+ // global.WithApplication_Name(appCfg.Name),
+ // global.WithApplication_Organization(appCfg.Organization),
+ // global.WithApplication_Module(appCfg.Module),
+ // global.WithApplication_Version(appCfg.Version),
+ // global.WithApplication_Owner(appCfg.Owner),
+ // global.WithApplication_Environment(appCfg.Environment),
+ //),
+ )
+ }
+ if regsCfg != nil {
+ srvOpts = append(srvOpts, server.SetServer_Registries(regsCfg))
+ }
+ if prosCfg != nil {
+ srvOpts = append(srvOpts, server.SetServer_Protocols(prosCfg))
+ }
+ if trasCfg != nil {
+ srvOpts = append(srvOpts, server.SetServer_Tracings(trasCfg))
+ }
+ if sdCfg != nil {
+ srvOpts = append(srvOpts, server.SetServer_Shutdown(sdCfg))
+ }
+
+ // options passed by users have higher priority
+ srvOpts = append(srvOpts, opts...)
+
+ srv, err := server.NewServer(srvOpts...)
+ if err != nil {
+ return nil, err
+ }
+
+ return srv, nil
+}
diff --git a/global/profiles_config.go b/global/profiles_config.go
index eb71854..d829dd8 100644
--- a/global/profiles_config.go
+++ b/global/profiles_config.go
@@ -23,7 +23,7 @@
}
func DefaultProfilesConfig() *ProfilesConfig {
- return nil
+ return &ProfilesConfig{}
}
type ProfilesOption func(*ProfilesConfig)
diff --git a/global/protocol_config.go b/global/protocol_config.go
index 2872c5b..25f1ae2 100644
--- a/global/protocol_config.go
+++ b/global/protocol_config.go
@@ -31,6 +31,10 @@
MaxServerRecvMsgSize string `default:"4mib" yaml:"max-server-recv-msg-size" json:"max-server-recv-msg-size,omitempty"`
}
+func DefaultProtocolConfig() *ProtocolConfig {
+ return &ProtocolConfig{}
+}
+
type ProtocolOption func(*ProtocolConfig)
func WithProtocol_Name(name string) ProtocolOption {
@@ -51,9 +55,9 @@
}
}
-func WithProtocol_Param(param interface{}) ProtocolOption {
+func WithProtocol_Params(params interface{}) ProtocolOption {
return func(cfg *ProtocolConfig) {
- cfg.Params = param
+ cfg.Params = params
}
}
diff --git a/global/provider_config.go b/global/provider_config.go
index 3edc5e0..ffba16e 100644
--- a/global/provider_config.go
+++ b/global/provider_config.go
@@ -29,11 +29,11 @@
// TracingKey is tracing ids list
TracingKey string `yaml:"tracing-key" json:"tracing-key" property:"tracing-key"`
// there is no need to configure Services
- //// Services services
- //Services map[string]*server.ServiceConfig `yaml:"services" json:"services,omitempty" property:"services"`
- ProxyFactory string `default:"default" yaml:"proxy" json:"proxy,omitempty" property:"proxy"`
- FilterConf interface{} `yaml:"filter_conf" json:"filter_conf,omitempty" property:"filter_conf"`
- ConfigType map[string]string `yaml:"config_type" json:"config_type,omitempty" property:"config_type"`
+ // Services services
+ Services map[string]*ServiceConfig `yaml:"services" json:"services,omitempty" property:"services"`
+ ProxyFactory string `default:"default" yaml:"proxy" json:"proxy,omitempty" property:"proxy"`
+ FilterConf interface{} `yaml:"filter_conf" json:"filter_conf,omitempty" property:"filter_conf"`
+ ConfigType map[string]string `yaml:"config_type" json:"config_type,omitempty" property:"config_type"`
// adaptive service
AdaptiveService bool `yaml:"adaptive-service" json:"adaptive-service" property:"adaptive-service"`
AdaptiveServiceVerbose bool `yaml:"adaptive-service-verbose" json:"adaptive-service-verbose" property:"adaptive-service-verbose"`
@@ -43,6 +43,7 @@
return &ProviderConfig{
RegistryIDs: make([]string, 8),
ProtocolIDs: make([]string, 8),
+ Services: make(map[string]*ServiceConfig),
}
}
diff --git a/global/reference_config.go b/global/reference_config.go
index 092edd0..94070fd 100644
--- a/global/reference_config.go
+++ b/global/reference_config.go
@@ -47,15 +47,17 @@
MeshProviderPort int `yaml:"mesh-provider-port" json:"mesh-provider-port,omitempty" propertiy:"mesh-provider-port"`
}
-type ReferenceOption func(*ReferenceConfig)
-
func DefaultReferenceConfig() *ReferenceConfig {
return &ReferenceConfig{
- Methods: make([]*MethodConfig, 0, 8),
- Params: make(map[string]string, 8),
+ // use Triple protocol by default
+ Protocol: "tri",
+ Methods: make([]*MethodConfig, 0, 8),
+ Params: make(map[string]string, 8),
}
}
+type ReferenceOption func(*ReferenceConfig)
+
func WithReference_InterfaceName(name string) ReferenceOption {
return func(cfg *ReferenceConfig) {
cfg.InterfaceName = name
@@ -88,7 +90,7 @@
func WithReference_RegistryIDs(registryIDs []string) ReferenceOption {
return func(cfg *ReferenceConfig) {
- if len(registryIDs) <= 0 {
+ if len(registryIDs) >= 0 {
cfg.RegistryIDs = registryIDs
}
}
diff --git a/global/registry_config.go b/global/registry_config.go
index 7330ef2..f96ff52 100644
--- a/global/registry_config.go
+++ b/global/registry_config.go
@@ -41,6 +41,10 @@
UseAsConfigCenter bool `default:"true" yaml:"use-as-config-center" json:"use-as-config-center,omitempty" property:"use-as-config-center"`
}
+func DefaultRegistryConfig() *RegistryConfig {
+ return &RegistryConfig{}
+}
+
type RegistryOption func(*RegistryConfig)
func WithRegistry_Protocol(protocol string) RegistryOption {
diff --git a/global/service_config.go b/global/service_config.go
index a098e97..95d271d 100644
--- a/global/service_config.go
+++ b/global/service_config.go
@@ -29,14 +29,20 @@
ParamSign string `yaml:"param.sign" json:"param.sign,omitempty" property:"param.sign"`
Tag string `yaml:"tag" json:"tag,omitempty" property:"tag"`
TracingKey string `yaml:"tracing-key" json:"tracing-key,omitempty" propertiy:"tracing-key"`
+
+ RCProtocolsMap map[string]*ProtocolConfig
+ RCRegistriesMap map[string]*RegistryConfig
+ ProxyFactoryKey string
}
type ServiceOption func(*ServiceConfig)
func DefaultServiceConfig() *ServiceConfig {
return &ServiceConfig{
- Methods: make([]*MethodConfig, 0, 8),
- Params: make(map[string]string, 8),
+ Methods: make([]*MethodConfig, 0, 8),
+ Params: make(map[string]string, 8),
+ RCProtocolsMap: make(map[string]*ProtocolConfig),
+ RCRegistriesMap: make(map[string]*RegistryConfig),
}
}
@@ -205,3 +211,27 @@
cfg.TracingKey = tracingKey
}
}
+
+func WithService_RCProtocol(name string, protocol *ProtocolConfig) ServiceOption {
+ return func(cfg *ServiceConfig) {
+ if cfg.RCProtocolsMap == nil {
+ cfg.RCProtocolsMap = make(map[string]*ProtocolConfig)
+ }
+ cfg.RCProtocolsMap[name] = protocol
+ }
+}
+
+func WithService_RCRegistry(name string, registry *RegistryConfig) ServiceOption {
+ return func(cfg *ServiceConfig) {
+ if cfg.RCRegistriesMap == nil {
+ cfg.RCRegistriesMap = make(map[string]*RegistryConfig)
+ }
+ cfg.RCRegistriesMap[name] = registry
+ }
+}
+
+func WithService_ProxyFactoryKey(factory string) ServiceOption {
+ return func(cfg *ServiceConfig) {
+ cfg.ProxyFactoryKey = factory
+ }
+}
diff --git a/global/tls_config.go b/global/tls_config.go
index 84f79bd..f6b1e56 100644
--- a/global/tls_config.go
+++ b/global/tls_config.go
@@ -26,6 +26,7 @@
}
func DefaultTLSConfig() *TLSConfig {
+ // please refer to /config/tls_config.go NewTLSConfigBuilder
return nil
}
diff --git a/graceful_shutdown/common.go b/graceful_shutdown/common.go
index b12f742..82cc4cc 100644
--- a/graceful_shutdown/common.go
+++ b/graceful_shutdown/common.go
@@ -18,10 +18,25 @@
package graceful_shutdown
import (
- "github.com/dubbogo/gost/log/logger"
"time"
)
+import (
+ hessian "github.com/apache/dubbo-go-hessian2"
+
+ "github.com/dubbogo/gost/log/logger"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common"
+)
+
+func init() {
+ hessian.RegisterPOJO(&common.MetadataInfo{})
+ hessian.RegisterPOJO(&common.ServiceInfo{})
+ hessian.RegisterPOJO(&common.URL{})
+}
+
func parseDuration(timeout string, desc string, def time.Duration) time.Duration {
res, err := time.ParseDuration(timeout)
if err != nil {
diff --git a/graceful_shutdown/compat.go b/graceful_shutdown/compat.go
index b462a57..f8b80bb 100644
--- a/graceful_shutdown/compat.go
+++ b/graceful_shutdown/compat.go
@@ -18,9 +18,12 @@
package graceful_shutdown
import (
+ "go.uber.org/atomic"
+)
+
+import (
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/global"
- "go.uber.org/atomic"
)
func compatShutdownConfig(c *global.ShutdownConfig) *config.ShutdownConfig {
diff --git a/graceful_shutdown/options.go b/graceful_shutdown/options.go
index d0a1122..e364bb9 100644
--- a/graceful_shutdown/options.go
+++ b/graceful_shutdown/options.go
@@ -17,26 +17,78 @@
package graceful_shutdown
-import "dubbo.apache.org/dubbo-go/v3/global"
+import (
+ "time"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/global"
+)
var (
defOpts = &Options{
- shutdown: global.DefaultShutdownConfig(),
+ Shutdown: global.DefaultShutdownConfig(),
}
)
type Options struct {
- shutdown *global.ShutdownConfig
+ Shutdown *global.ShutdownConfig
}
-func defaultOptions() *Options {
+func DefaultOptions() *Options {
return defOpts
}
type Option func(*Options)
+func WithTimeout(timeout time.Duration) Option {
+ return func(opts *Options) {
+ opts.Shutdown.Timeout = timeout.String()
+ }
+}
+
+func WithStepTimeout(timeout time.Duration) Option {
+ return func(opts *Options) {
+ opts.Shutdown.StepTimeout = timeout.String()
+ }
+}
+
+func WithConsumerUpdateWaitTime(duration time.Duration) Option {
+ return func(opts *Options) {
+ opts.Shutdown.ConsumerUpdateWaitTime = duration.String()
+ }
+}
+
+// todo(DMwangnima): add more specified configuration API
+//func WithRejectRequestHandler(handler string) Option {
+// return func(opts *Options) {
+// opts.Shutdown.RejectRequestHandler = handler
+// }
+//}
+
+func WithoutInternalSignal() Option {
+ return func(opts *Options) {
+ signal := false
+ opts.Shutdown.InternalSignal = &signal
+ }
+}
+
+func WithOfflineRequestWindowTimeout(timeout time.Duration) Option {
+ return func(opts *Options) {
+ opts.Shutdown.OfflineRequestWindowTimeout = timeout.String()
+ }
+}
+
+func WithRejectRequest() Option {
+ return func(opts *Options) {
+ opts.Shutdown.RejectRequest.Store(true)
+ }
+}
+
+// ---------- For framework ----------
+
func WithShutdown_Config(cfg *global.ShutdownConfig) Option {
return func(opts *Options) {
- opts.shutdown = cfg
+ opts.Shutdown = cfg
}
}
diff --git a/graceful_shutdown/shutdown.go b/graceful_shutdown/shutdown.go
index 73e70c3..b560ed6 100644
--- a/graceful_shutdown/shutdown.go
+++ b/graceful_shutdown/shutdown.go
@@ -18,10 +18,6 @@
package graceful_shutdown
import (
- "dubbo.apache.org/dubbo-go/v3/common/constant"
- "dubbo.apache.org/dubbo-go/v3/common/extension"
- "dubbo.apache.org/dubbo-go/v3/config"
- "github.com/dubbogo/gost/log/logger"
"os"
"os/signal"
"runtime/debug"
@@ -29,6 +25,16 @@
"time"
)
+import (
+ "github.com/dubbogo/gost/log/logger"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/common/extension"
+ "dubbo.apache.org/dubbo-go/v3/config"
+)
+
const (
// todo(DMwangnima): these descriptions and defaults could be wrapped by functions of Options
defaultTimeout = 60 * time.Second
@@ -52,11 +58,12 @@
func Init(opts ...Option) {
initOnce.Do(func() {
- newOpts := defaultOptions()
+ protocols = make(map[string]struct{})
+ newOpts := DefaultOptions()
for _, opt := range opts {
opt(newOpts)
}
- compatShutdown = compatShutdownConfig(newOpts.shutdown)
+ compatShutdown = compatShutdownConfig(newOpts.Shutdown)
// retrieve ShutdownConfig for gracefulShutdownFilter
cGracefulShutdownFilter, existcGracefulShutdownFilter := extension.GetFilter(constant.GracefulShutdownConsumerFilterKey)
if !existcGracefulShutdownFilter {
diff --git a/loader.go b/loader.go
new file mode 100644
index 0000000..5679c25
--- /dev/null
+++ b/loader.go
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dubbo
+
+import (
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "runtime"
+ "strings"
+)
+
+import (
+ "github.com/dubbogo/gost/log/logger"
+
+ "github.com/knadh/koanf"
+ "github.com/knadh/koanf/parsers/json"
+ "github.com/knadh/koanf/parsers/toml"
+ "github.com/knadh/koanf/parsers/yaml"
+ "github.com/knadh/koanf/providers/confmap"
+ "github.com/knadh/koanf/providers/rawbytes"
+
+ "github.com/pkg/errors"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/common/constant/file"
+ "dubbo.apache.org/dubbo-go/v3/config/parsers/properties"
+)
+
+var (
+ defaultActive = "default"
+ instanceOptions = defaultInstanceOptions()
+)
+
+func Load(opts ...LoaderConfOption) error {
+ // conf
+ conf := NewLoaderConf(opts...)
+ if conf.opts == nil {
+ koan := GetConfigResolver(conf)
+ koan = conf.MergeConfig(koan)
+ if err := koan.UnmarshalWithConf(instanceOptions.Prefix(),
+ instanceOptions, koanf.UnmarshalConf{Tag: "yaml"}); err != nil {
+ return err
+ }
+ } else {
+ instanceOptions = conf.opts
+ }
+
+ if err := instanceOptions.init(); err != nil {
+ return err
+ }
+
+ // todo(DMwangnima): use independent Consumer and Provider logic
+ return nil
+}
+
+type loaderConf struct {
+ suffix string // loaderConf file extension default yaml
+ path string // loaderConf file path default ./conf/dubbogo.yaml
+ delim string // loaderConf file delim default .
+ bytes []byte // config bytes
+ opts *InstanceOptions // user provide InstanceOptions built by WithXXX api
+ name string // config file name
+}
+
+func NewLoaderConf(opts ...LoaderConfOption) *loaderConf {
+ configFilePath := "../conf/dubbogo.yaml"
+ if configFilePathFromEnv := os.Getenv(constant.ConfigFileEnvKey); configFilePathFromEnv != "" {
+ configFilePath = configFilePathFromEnv
+ }
+ name, suffix := resolverFilePath(configFilePath)
+ conf := &loaderConf{
+ suffix: suffix,
+ path: absolutePath(configFilePath),
+ delim: ".",
+ name: name,
+ }
+ for _, opt := range opts {
+ opt.apply(conf)
+ }
+ if conf.opts != nil {
+ return conf
+ }
+ if len(conf.bytes) <= 0 {
+ if bytes, err := ioutil.ReadFile(conf.path); err != nil {
+ panic(err)
+ } else {
+ conf.bytes = bytes
+ }
+ }
+ return conf
+}
+
+type LoaderConfOption interface {
+ apply(vc *loaderConf)
+}
+
+type loaderConfigFunc func(*loaderConf)
+
+func (fn loaderConfigFunc) apply(vc *loaderConf) {
+ fn(vc)
+}
+
+// WithGenre set load config file suffix
+// Deprecated: replaced by WithSuffix
+func WithGenre(suffix string) LoaderConfOption {
+ return loaderConfigFunc(func(conf *loaderConf) {
+ g := strings.ToLower(suffix)
+ if err := checkFileSuffix(g); err != nil {
+ panic(err)
+ }
+ conf.suffix = g
+ })
+}
+
+// WithSuffix set load config file suffix
+func WithSuffix(suffix file.Suffix) LoaderConfOption {
+ return loaderConfigFunc(func(conf *loaderConf) {
+ conf.suffix = string(suffix)
+ })
+}
+
+// WithPath set load config path
+func WithPath(path string) LoaderConfOption {
+ return loaderConfigFunc(func(conf *loaderConf) {
+ conf.path = absolutePath(path)
+ if bytes, err := ioutil.ReadFile(conf.path); err != nil {
+ panic(err)
+ } else {
+ conf.bytes = bytes
+ }
+ name, suffix := resolverFilePath(path)
+ conf.suffix = suffix
+ conf.name = name
+ })
+}
+
+func WithInstanceOptions(opts *InstanceOptions) LoaderConfOption {
+ return loaderConfigFunc(func(conf *loaderConf) {
+ conf.opts = opts
+ })
+}
+
+func WithDelim(delim string) LoaderConfOption {
+ return loaderConfigFunc(func(conf *loaderConf) {
+ conf.delim = delim
+ })
+}
+
+// WithBytes set load config bytes
+func WithBytes(bytes []byte) LoaderConfOption {
+ return loaderConfigFunc(func(conf *loaderConf) {
+ conf.bytes = bytes
+ })
+}
+
+// absolutePath get absolut path
+func absolutePath(inPath string) string {
+
+ if inPath == "$HOME" || strings.HasPrefix(inPath, "$HOME"+string(os.PathSeparator)) {
+ inPath = userHomeDir() + inPath[5:]
+ }
+
+ if filepath.IsAbs(inPath) {
+ return filepath.Clean(inPath)
+ }
+
+ p, err := filepath.Abs(inPath)
+ if err == nil {
+ return filepath.Clean(p)
+ }
+
+ return ""
+}
+
+// userHomeDir get gopath
+func userHomeDir() string {
+ if runtime.GOOS == "windows" {
+ home := os.Getenv("HOMEDRIVE") + os.Getenv("HOMEPATH")
+ if home == "" {
+ home = os.Getenv("USERPROFILE")
+ }
+ return home
+ }
+ return os.Getenv("HOME")
+}
+
+// checkFileSuffix check file suffix
+func checkFileSuffix(suffix string) error {
+ for _, g := range []string{"json", "toml", "yaml", "yml", "properties"} {
+ if g == suffix {
+ return nil
+ }
+ }
+ return errors.Errorf("no support file suffix: %s", suffix)
+}
+
+// resolverFilePath resolver file path
+// eg: give a ./conf/dubbogo.yaml return dubbogo and yaml
+func resolverFilePath(path string) (name, suffix string) {
+ paths := strings.Split(path, "/")
+ fileName := strings.Split(paths[len(paths)-1], ".")
+ if len(fileName) < 2 {
+ return fileName[0], string(file.YAML)
+ }
+ return fileName[0], fileName[1]
+}
+
+// MergeConfig merge config file
+func (conf *loaderConf) MergeConfig(koan *koanf.Koanf) *koanf.Koanf {
+ var (
+ activeKoan *koanf.Koanf
+ activeConf *loaderConf
+ )
+ active := koan.String("dubbo.profiles.active")
+ active = getLegalActive(active)
+ logger.Infof("The following profiles are active: %s", active)
+ if defaultActive != active {
+ path := conf.getActiveFilePath(active)
+ if !pathExists(path) {
+ logger.Debugf("Config file:%s not exist skip config merge", path)
+ return koan
+ }
+ activeConf = NewLoaderConf(WithPath(path))
+ activeKoan = GetConfigResolver(activeConf)
+ if err := koan.Merge(activeKoan); err != nil {
+ logger.Debugf("Config merge err %s", err)
+ }
+ }
+ return koan
+}
+
+func (conf *loaderConf) getActiveFilePath(active string) string {
+ suffix := constant.DotSeparator + conf.suffix
+ return strings.ReplaceAll(conf.path, suffix, "") + "-" + active + suffix
+}
+
+func pathExists(path string) bool {
+ if _, err := os.Stat(path); err == nil {
+ return true
+ } else {
+ return !os.IsNotExist(err)
+ }
+}
+
+// getLegalActive if active is null return default
+func getLegalActive(active string) string {
+ if len(active) == 0 {
+ return defaultActive
+ }
+ return active
+}
+
+// GetConfigResolver get config resolver
+func GetConfigResolver(conf *loaderConf) *koanf.Koanf {
+ var (
+ k *koanf.Koanf
+ err error
+ )
+ if len(conf.suffix) <= 0 {
+ conf.suffix = string(file.YAML)
+ }
+ if len(conf.delim) <= 0 {
+ conf.delim = "."
+ }
+ bytes := conf.bytes
+ if len(bytes) <= 0 {
+ panic(errors.New("bytes is nil,please set bytes or file path"))
+ }
+ k = koanf.New(conf.delim)
+
+ switch conf.suffix {
+ case "yaml", "yml":
+ err = k.Load(rawbytes.Provider(bytes), yaml.Parser())
+ case "json":
+ err = k.Load(rawbytes.Provider(bytes), json.Parser())
+ case "toml":
+ err = k.Load(rawbytes.Provider(bytes), toml.Parser())
+ case "properties":
+ err = k.Load(rawbytes.Provider(bytes), properties.Parser())
+ default:
+ err = errors.Errorf("no support %s file suffix", conf.suffix)
+ }
+
+ if err != nil {
+ panic(err)
+ }
+ return resolvePlaceholder(k)
+}
+
+// resolvePlaceholder replace ${xx} with real value
+func resolvePlaceholder(resolver *koanf.Koanf) *koanf.Koanf {
+ m := make(map[string]interface{})
+ for k, v := range resolver.All() {
+ s, ok := v.(string)
+ if !ok {
+ continue
+ }
+ newKey, defaultValue := checkPlaceholder(s)
+ if newKey == "" {
+ continue
+ }
+ m[k] = resolver.Get(newKey)
+ if m[k] == nil {
+ m[k] = defaultValue
+ }
+ }
+ err := resolver.Load(confmap.Provider(m, resolver.Delim()), nil)
+ if err != nil {
+ logger.Errorf("resolvePlaceholder error %s", err)
+ }
+ return resolver
+}
+
+func checkPlaceholder(s string) (newKey, defaultValue string) {
+ s = strings.TrimSpace(s)
+ if !strings.HasPrefix(s, file.PlaceholderPrefix) || !strings.HasSuffix(s, file.PlaceholderSuffix) {
+ return
+ }
+ s = s[len(file.PlaceholderPrefix) : len(s)-len(file.PlaceholderSuffix)]
+ indexColon := strings.Index(s, ":")
+ if indexColon == -1 {
+ newKey = strings.TrimSpace(s)
+ return
+ }
+ newKey = strings.TrimSpace(s[0:indexColon])
+ defaultValue = strings.TrimSpace(s[indexColon+1:])
+
+ return
+}
diff --git a/metadata/metadata.go b/metadata/metadata.go
new file mode 100644
index 0000000..b35ac29
--- /dev/null
+++ b/metadata/metadata.go
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package metadata
+
+import (
+ "github.com/dubbogo/gost/log/logger"
+
+ perrors "github.com/pkg/errors"
+
+ "go.uber.org/atomic"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/common/extension"
+ "dubbo.apache.org/dubbo-go/v3/metadata/service/exporter"
+)
+
+var (
+ exporting = &atomic.Bool{}
+)
+
+func ExportMetadataService() {
+ ms, err := extension.GetLocalMetadataService(constant.DefaultKey)
+ if err != nil {
+ logger.Warnf("could not init metadata service", err)
+ return
+ }
+
+ if exporting.Load() {
+ return
+ }
+
+ // In theory, we can use sync.Once
+ // But sync.Once is not reentrant.
+ // Now the invocation chain is createRegistry -> tryInitMetadataService -> metadataServiceExporter.export
+ // -> createRegistry -> initMetadataService...
+ // So using sync.Once will result in dead lock
+ exporting.Store(true)
+
+ expt := extension.GetMetadataServiceExporter(constant.DefaultKey, ms)
+ if expt == nil {
+ logger.Warnf("get metadata service exporter failed, pls check if you import _ \"dubbo.apache.org/dubbo-go/v3/metadata/service/exporter/configurable\"")
+ return
+ }
+
+ err = expt.Export(nil)
+ if err != nil {
+ logger.Errorf("could not export the metadata service, err = %s", err.Error())
+ return
+ }
+
+ // report interface-app mapping
+ err = publishMapping(expt)
+ if err != nil {
+ logger.Errorf("Publish interface-application mapping failed, got error %#v", err)
+ }
+}
+
+// OnEvent only handle ServiceConfigExportedEvent
+func publishMapping(sc exporter.MetadataServiceExporter) error {
+ urls := sc.GetExportedURLs()
+
+ for _, u := range urls {
+ err := extension.GetGlobalServiceNameMapping().Map(u)
+ if err != nil {
+ return perrors.WithMessage(err, "could not map the service: "+u.String())
+ }
+ }
+ return nil
+}
diff --git a/metadata/service/local/service.go b/metadata/service/local/service.go
index aeac070..440953e 100644
--- a/metadata/service/local/service.go
+++ b/metadata/service/local/service.go
@@ -67,6 +67,7 @@
func GetLocalMetadataService() (service.MetadataService, error) {
metadataServiceInitOnce.Do(func() {
metadataServiceInstance = &MetadataService{
+ // todo(DMwangnima): use external config
BaseMetadataService: service.NewBaseMetadataService(config.GetApplicationConfig().Name),
exportedServiceURLs: &sync.Map{},
subscribedServiceURLs: &sync.Map{},
diff --git a/options.go b/options.go
index 65a667f..51e9b2d 100644
--- a/options.go
+++ b/options.go
@@ -18,7 +18,16 @@
package dubbo
import (
+ "github.com/dubbogo/gost/log/logger"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/global"
+ "dubbo.apache.org/dubbo-go/v3/graceful_shutdown"
+ "dubbo.apache.org/dubbo-go/v3/protocol"
+ "dubbo.apache.org/dubbo-go/v3/registry"
)
type InstanceOptions struct {
@@ -66,176 +75,301 @@
opt(rc)
}
+ // remaining procedure is like RootConfig.Init() without RootConfig.Start()
+ // tasks of RootConfig.Start() would be decomposed to Client and Server
rcCompat := compatRootConfig(rc)
- if err := rcCompat.Init(); err != nil {
+ if err := rcCompat.Logger.Init(); err != nil { // init default logger
return err
}
+ if err := rcCompat.ConfigCenter.Init(rcCompat); err != nil {
+ logger.Infof("[Config Center] Config center doesn't start")
+ logger.Debugf("config center doesn't start because %s", err)
+ } else {
+ if err = rcCompat.Logger.Init(); err != nil { // init logger using config from config center again
+ return err
+ }
+ }
+
+ if err := rcCompat.Application.Init(); err != nil {
+ return err
+ }
+
+ // init user define
+ if err := rcCompat.Custom.Init(); err != nil {
+ return err
+ }
+
+ // init protocol
+ protocols := rcCompat.Protocols
+ if len(protocols) <= 0 {
+ protocol := &config.ProtocolConfig{}
+ protocols = make(map[string]*config.ProtocolConfig, 1)
+ protocols[constant.Dubbo] = protocol
+ rcCompat.Protocols = protocols
+ }
+ for _, protocol := range protocols {
+ if err := protocol.Init(); err != nil {
+ return err
+ }
+ }
+
+ // init registry
+ registries := rcCompat.Registries
+ if registries != nil {
+ for _, reg := range registries {
+ if err := reg.Init(); err != nil {
+ return err
+ }
+ }
+ }
+
+ if err := rcCompat.MetadataReport.Init(rcCompat); err != nil {
+ return err
+ }
+ if err := rcCompat.Metric.Init(); err != nil {
+ return err
+ }
+ for _, t := range rcCompat.Tracing {
+ if err := t.Init(); err != nil {
+ return err
+ }
+ }
+
+ routers := rcCompat.Router
+ if len(routers) > 0 {
+ for _, r := range routers {
+ if err := r.Init(); err != nil {
+ return err
+ }
+ }
+ rcCompat.Router = routers
+ }
+
+ // provider、consumer must last init
+ if err := rcCompat.Provider.Init(rcCompat); err != nil {
+ return err
+ }
+ if err := rcCompat.Consumer.Init(rcCompat); err != nil {
+ return err
+ }
+ if err := rcCompat.Shutdown.Init(); err != nil {
+ return err
+ }
+ config.SetRootConfig(*rcCompat)
return nil
}
+func (rc *InstanceOptions) Prefix() string {
+ return constant.Dubbo
+}
+
type InstanceOption func(*InstanceOptions)
-func WithApplication(opts ...global.ApplicationOption) InstanceOption {
- appCfg := new(global.ApplicationConfig)
- for _, opt := range opts {
- opt(appCfg)
- }
-
- return func(cfg *InstanceOptions) {
- cfg.Application = appCfg
+func WithOrganization(organization string) InstanceOption {
+ return func(opts *InstanceOptions) {
+ opts.Application.Organization = organization
}
}
-func WithProtocol(key string, opts ...global.ProtocolOption) InstanceOption {
- proCfg := new(global.ProtocolConfig)
+func WithName(name string) InstanceOption {
+ return func(opts *InstanceOptions) {
+ opts.Application.Name = name
+ }
+}
+
+func WithModule(module string) InstanceOption {
+ return func(opts *InstanceOptions) {
+ opts.Application.Module = module
+ }
+}
+
+func WithGroup(group string) InstanceOption {
+ return func(opts *InstanceOptions) {
+ opts.Application.Group = group
+ }
+}
+
+func WithVersion(version string) InstanceOption {
+ return func(opts *InstanceOptions) {
+ opts.Application.Version = version
+ }
+}
+
+func WithOwner(owner string) InstanceOption {
+ return func(opts *InstanceOptions) {
+ opts.Application.Owner = owner
+ }
+}
+
+func WithEnvironment(environment string) InstanceOption {
+ return func(opts *InstanceOptions) {
+ opts.Application.Environment = environment
+ }
+}
+
+func WithRemoteMetadata() InstanceOption {
+ return func(opts *InstanceOptions) {
+ opts.Application.MetadataType = constant.RemoteMetadataStorageType
+ }
+}
+
+func WithTag(tag string) InstanceOption {
+ return func(opts *InstanceOptions) {
+ opts.Application.Tag = tag
+ }
+}
+
+func WithProtocol(key string, opts ...protocol.Option) InstanceOption {
+ proOpts := protocol.DefaultOptions()
for _, opt := range opts {
- opt(proCfg)
+ opt(proOpts)
}
- return func(cfg *InstanceOptions) {
- if cfg.Protocols == nil {
- cfg.Protocols = make(map[string]*global.ProtocolConfig)
+ return func(insOpts *InstanceOptions) {
+ if insOpts.Protocols == nil {
+ insOpts.Protocols = make(map[string]*global.ProtocolConfig)
}
- cfg.Protocols[key] = proCfg
+ insOpts.Protocols[key] = proOpts.Protocol
}
}
-func WithRegistry(key string, opts ...global.RegistryOption) InstanceOption {
- regCfg := new(global.RegistryConfig)
+func WithRegistry(key string, opts ...registry.Option) InstanceOption {
+ regOpts := registry.DefaultOptions()
for _, opt := range opts {
- opt(regCfg)
+ opt(regOpts)
}
- return func(cfg *InstanceOptions) {
- if cfg.Registries == nil {
- cfg.Registries = make(map[string]*global.RegistryConfig)
+ return func(insOpts *InstanceOptions) {
+ if insOpts.Registries == nil {
+ insOpts.Registries = make(map[string]*global.RegistryConfig)
}
- cfg.Registries[key] = regCfg
+ insOpts.Registries[key] = regOpts.Registry
}
}
-func WithConfigCenter(opts ...global.CenterOption) InstanceOption {
- ccCfg := new(global.CenterConfig)
+//func WithConfigCenter(opts ...global.CenterOption) InstanceOption {
+// ccCfg := new(global.CenterConfig)
+// for _, opt := range opts {
+// opt(ccCfg)
+// }
+//
+// return func(cfg *InstanceOptions) {
+// cfg.ConfigCenter = ccCfg
+// }
+//}
+
+//func WithMetadataReport(opts ...global.MetadataReportOption) InstanceOption {
+// mrCfg := new(global.MetadataReportConfig)
+// for _, opt := range opts {
+// opt(mrCfg)
+// }
+//
+// return func(cfg *InstanceOptions) {
+// cfg.MetadataReport = mrCfg
+// }
+//}
+
+//func WithConsumer(opts ...global.ConsumerOption) InstanceOption {
+// conCfg := new(global.ConsumerConfig)
+// for _, opt := range opts {
+// opt(conCfg)
+// }
+//
+// return func(cfg *InstanceOptions) {
+// cfg.Consumer = conCfg
+// }
+//}
+
+//func WithMetric(opts ...global.MetricOption) InstanceOption {
+// meCfg := new(global.MetricConfig)
+// for _, opt := range opts {
+// opt(meCfg)
+// }
+//
+// return func(cfg *InstanceOptions) {
+// cfg.Metric = meCfg
+// }
+//}
+
+//func WithTracing(key string, opts ...global.TracingOption) InstanceOption {
+// traCfg := new(global.TracingConfig)
+// for _, opt := range opts {
+// opt(traCfg)
+// }
+//
+// return func(cfg *InstanceOptions) {
+// if cfg.Tracing == nil {
+// cfg.Tracing = make(map[string]*global.TracingConfig)
+// }
+// cfg.Tracing[key] = traCfg
+// }
+//}
+
+//func WithLogger(opts ...global.LoggerOption) InstanceOption {
+// logCfg := new(global.LoggerConfig)
+// for _, opt := range opts {
+// opt(logCfg)
+// }
+//
+// return func(cfg *InstanceOptions) {
+// cfg.Logger = logCfg
+// }
+//}
+
+func WithShutdown(opts ...graceful_shutdown.Option) InstanceOption {
+ sdOpts := graceful_shutdown.DefaultOptions()
for _, opt := range opts {
- opt(ccCfg)
+ opt(sdOpts)
}
- return func(cfg *InstanceOptions) {
- cfg.ConfigCenter = ccCfg
+ return func(insOpts *InstanceOptions) {
+ insOpts.Shutdown = sdOpts.Shutdown
}
}
-func WithMetadataReport(opts ...global.MetadataReportOption) InstanceOption {
- mrCfg := new(global.MetadataReportConfig)
- for _, opt := range opts {
- opt(mrCfg)
- }
+// todo(DMwangnima): enumerate specific EventDispatcherType
+//func WithEventDispatcherType(typ string) InstanceOption {
+// return func(cfg *InstanceOptions) {
+// cfg.EventDispatcherType = typ
+// }
+//}
- return func(cfg *InstanceOptions) {
- cfg.MetadataReport = mrCfg
- }
-}
+//func WithCacheFile(file string) InstanceOption {
+// return func(cfg *InstanceOptions) {
+// cfg.CacheFile = file
+// }
+//}
-func WithConsumer(opts ...global.ConsumerOption) InstanceOption {
- conCfg := new(global.ConsumerConfig)
- for _, opt := range opts {
- opt(conCfg)
- }
+//func WithCustom(opts ...global.CustomOption) InstanceOption {
+// cusCfg := new(global.CustomConfig)
+// for _, opt := range opts {
+// opt(cusCfg)
+// }
+//
+// return func(cfg *InstanceOptions) {
+// cfg.Custom = cusCfg
+// }
+//}
- return func(cfg *InstanceOptions) {
- cfg.Consumer = conCfg
- }
-}
+//func WithProfiles(opts ...global.ProfilesOption) InstanceOption {
+// proCfg := new(global.ProfilesConfig)
+// for _, opt := range opts {
+// opt(proCfg)
+// }
+//
+// return func(cfg *InstanceOptions) {
+// cfg.Profiles = proCfg
+// }
+//}
-func WithMetric(opts ...global.MetricOption) InstanceOption {
- meCfg := new(global.MetricConfig)
- for _, opt := range opts {
- opt(meCfg)
- }
-
- return func(cfg *InstanceOptions) {
- cfg.Metric = meCfg
- }
-}
-
-func WithTracing(key string, opts ...global.TracingOption) InstanceOption {
- traCfg := new(global.TracingConfig)
- for _, opt := range opts {
- opt(traCfg)
- }
-
- return func(cfg *InstanceOptions) {
- if cfg.Tracing == nil {
- cfg.Tracing = make(map[string]*global.TracingConfig)
- }
- cfg.Tracing[key] = traCfg
- }
-}
-
-func WithLogger(opts ...global.LoggerOption) InstanceOption {
- logCfg := new(global.LoggerConfig)
- for _, opt := range opts {
- opt(logCfg)
- }
-
- return func(cfg *InstanceOptions) {
- cfg.Logger = logCfg
- }
-}
-
-func WithShutdown(opts ...global.ShutdownOption) InstanceOption {
- sdCfg := new(global.ShutdownConfig)
- for _, opt := range opts {
- opt(sdCfg)
- }
-
- return func(cfg *InstanceOptions) {
- cfg.Shutdown = sdCfg
- }
-}
-
-func WithEventDispatcherType(typ string) InstanceOption {
- return func(cfg *InstanceOptions) {
- cfg.EventDispatcherType = typ
- }
-}
-
-func WithCacheFile(file string) InstanceOption {
- return func(cfg *InstanceOptions) {
- cfg.CacheFile = file
- }
-}
-
-func WithCustom(opts ...global.CustomOption) InstanceOption {
- cusCfg := new(global.CustomConfig)
- for _, opt := range opts {
- opt(cusCfg)
- }
-
- return func(cfg *InstanceOptions) {
- cfg.Custom = cusCfg
- }
-}
-
-func WithProfiles(opts ...global.ProfilesOption) InstanceOption {
- proCfg := new(global.ProfilesConfig)
- for _, opt := range opts {
- opt(proCfg)
- }
-
- return func(cfg *InstanceOptions) {
- cfg.Profiles = proCfg
- }
-}
-
-func WithTLS(opts ...global.TLSOption) InstanceOption {
- tlsCfg := new(global.TLSConfig)
- for _, opt := range opts {
- opt(tlsCfg)
- }
-
- return func(cfg *InstanceOptions) {
- cfg.TLSConfig = tlsCfg
- }
-}
+//func WithTLS(opts ...global.TLSOption) InstanceOption {
+// tlsCfg := new(global.TLSConfig)
+// for _, opt := range opts {
+// opt(tlsCfg)
+// }
+//
+// return func(cfg *InstanceOptions) {
+// cfg.TLSConfig = tlsCfg
+// }
+//}
diff --git a/protocol/dubbo3/dubbo3_protocol.go b/protocol/dubbo3/dubbo3_protocol.go
index 8d824b9..c3b6e7f 100644
--- a/protocol/dubbo3/dubbo3_protocol.go
+++ b/protocol/dubbo3/dubbo3_protocol.go
@@ -40,7 +40,6 @@
import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
- "dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/invocation"
@@ -49,7 +48,8 @@
var protocolOnce sync.Once
func init() {
- extension.SetProtocol(tripleConstant.TRIPLE, GetProtocol)
+ // todo(DMwangnima): deprecated
+ //extension.SetProtocol(tripleConstant.TRIPLE, GetProtocol)
protocolOnce = sync.Once{}
}
diff --git a/protocol/options.go b/protocol/options.go
new file mode 100644
index 0000000..6373e59
--- /dev/null
+++ b/protocol/options.go
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package protocol
+
+import (
+ "strconv"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/global"
+)
+
+type Options struct {
+ Protocol *global.ProtocolConfig
+}
+
+func DefaultOptions() *Options {
+ return &Options{Protocol: global.DefaultProtocolConfig()}
+}
+
+type Option func(*Options)
+
+func WithDubbo() Option {
+ return func(opts *Options) {
+ opts.Protocol.Name = "dubbo"
+ }
+}
+
+func WithGRPC() Option {
+ return func(opts *Options) {
+ opts.Protocol.Name = "grpc"
+ }
+}
+
+func WithJSONRPC() Option {
+ return func(opts *Options) {
+ opts.Protocol.Name = "jsonrpc"
+ }
+}
+
+func WithREST() Option {
+ return func(opts *Options) {
+ opts.Protocol.Name = "rest"
+ }
+}
+
+func WithTriple() Option {
+ return func(opts *Options) {
+ opts.Protocol.Name = "tri"
+ }
+}
+
+func WithIp(ip string) Option {
+ return func(opts *Options) {
+ opts.Protocol.Ip = ip
+ }
+}
+
+func WithPort(port int) Option {
+ return func(opts *Options) {
+ opts.Protocol.Port = strconv.Itoa(port)
+ }
+}
+
+func WithParams(params interface{}) Option {
+ return func(opts *Options) {
+ opts.Protocol.Params = params
+ }
+}
+
+func WithMaxServerSendMsgSize(size int) Option {
+ return func(opts *Options) {
+ opts.Protocol.MaxServerSendMsgSize = strconv.Itoa(size)
+ }
+}
+
+func WithMaxServerRecvMsgSize(size int) Option {
+ return func(opts *Options) {
+ opts.Protocol.MaxServerRecvMsgSize = strconv.Itoa(size)
+ }
+}
diff --git a/protocol/triple/client.go b/protocol/triple/client.go
index ff11db4..9d1f3fa 100644
--- a/protocol/triple/client.go
+++ b/protocol/triple/client.go
@@ -28,8 +28,6 @@
)
import (
- "github.com/dubbogo/gost/log/logger"
-
"github.com/dustin/go-humanize"
"golang.org/x/net/http2"
@@ -38,7 +36,6 @@
import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
- "dubbo.apache.org/dubbo-go/v3/config"
tri "dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol"
)
@@ -137,6 +134,16 @@
}
triClientOpts = append(triClientOpts, tri.WithSendMaxBytes(maxCallSendMsgSize))
+ // set serialization
+ serialization := url.GetParam(constant.SerializationKey, constant.ProtobufSerialization)
+ switch serialization {
+ case constant.ProtobufSerialization:
+ case constant.JSONSerialization:
+ triClientOpts = append(triClientOpts, tri.WithProtoJSON())
+ default:
+ panic(fmt.Sprintf("Unsupported serialization: %s", serialization))
+ }
+
// todo:// process timeout
// consumer config client connectTimeout
//connectTimeout := config.GetConsumerConfig().ConnectTimeout
@@ -157,22 +164,22 @@
// )
var cfg *tls.Config
var tlsFlag bool
- var err error
+ //var err error
- // todo: move TLSConfig from root to consumer
- if tlsConfig := config.GetRootConfig().TLSConfig; tlsConfig != nil {
- cfg, err = config.GetClientTlsConfig(&config.TLSConfig{
- CACertFile: tlsConfig.CACertFile,
- TLSCertFile: tlsConfig.TLSCertFile,
- TLSKeyFile: tlsConfig.TLSKeyFile,
- TLSServerName: tlsConfig.TLSServerName,
- })
- if err != nil {
- return nil, err
- }
- logger.Infof("TRIPLE clientManager initialized the TLSConfig configuration successfully")
- tlsFlag = true
- }
+ // todo: think about a more elegant way to configure tls
+ //if tlsConfig := config.GetRootConfig().TLSConfig; tlsConfig != nil {
+ // cfg, err = config.GetClientTlsConfig(&config.TLSConfig{
+ // CACertFile: tlsConfig.CACertFile,
+ // TLSCertFile: tlsConfig.TLSCertFile,
+ // TLSKeyFile: tlsConfig.TLSKeyFile,
+ // TLSServerName: tlsConfig.TLSServerName,
+ // })
+ // if err != nil {
+ // return nil, err
+ // }
+ // logger.Infof("TRIPLE clientManager initialized the TLSConfig configuration successfully")
+ // tlsFlag = true
+ //}
// todo(DMwangnima): this code fragment would be used to be compatible with old triple client
//key := url.GetParam(constant.InterfaceKey, "")
@@ -205,7 +212,7 @@
}
triClientOpts = append(triClientOpts)
default:
- panic(fmt.Sprintf("Unsupported type: %s", callType))
+ panic(fmt.Sprintf("Unsupported callType: %s", callType))
}
httpClient := &http.Client{
Transport: transport,
diff --git a/protocol/triple/internal/client/cmd_classic/dubbogo.yml b/protocol/triple/internal/client/cmd_classic/dubbogo.yml
deleted file mode 100644
index acf53b3..0000000
--- a/protocol/triple/internal/client/cmd_classic/dubbogo.yml
+++ /dev/null
@@ -1,11 +0,0 @@
-dubbo:
- registries:
- demoZK:
- protocol: zookeeper
- timeout: 3s
- address: 127.0.0.1:2181
- consumer:
- references:
- greet.GreetService:
- protocol: grpc_new
- interface: greet.GreetService
\ No newline at end of file
diff --git a/protocol/triple/internal/client/cmd_classic/main.go b/protocol/triple/internal/client/cmd_classic/main.go
deleted file mode 100644
index fcda071..0000000
--- a/protocol/triple/internal/client/cmd_classic/main.go
+++ /dev/null
@@ -1,28 +0,0 @@
-package main
-
-import (
- "context"
- "fmt"
-)
-
-import (
- "dubbo.apache.org/dubbo-go/v3/config"
- _ "dubbo.apache.org/dubbo-go/v3/imports"
- greet "dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/proto"
- "dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/proto/triple_gen/greettriple"
-)
-
-func main() {
- svc := new(greettriple.GreetServiceImpl)
- greettriple.SetConsumerService(svc)
- if err := config.Load(); err != nil {
- panic(err)
- }
-
- resp, err := svc.Greet(context.Background(), &greet.GreetRequest{Name: "dubbo"})
- if err != nil {
- panic(err)
- }
-
- fmt.Println(resp.Greeting)
-}
diff --git a/protocol/triple/internal/client/cmd_client/main.go b/protocol/triple/internal/client/cmd_client/main.go
index a630542..7adb0fa 100644
--- a/protocol/triple/internal/client/cmd_client/main.go
+++ b/protocol/triple/internal/client/cmd_client/main.go
@@ -1,20 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package main
import (
- "context"
- "fmt"
-)
-
-import (
"dubbo.apache.org/dubbo-go/v3/client"
- greet "dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/proto"
+ _ "dubbo.apache.org/dubbo-go/v3/imports"
+ "dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/client/common"
"dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/proto/triple_gen/greettriple"
)
func main() {
// for the most brief RPC case
cli, err := client.NewClient(
- client.WithURL("127.0.0.1:20000"),
+ client.WithURL("tri://127.0.0.1:20000"),
)
if err != nil {
panic(err)
@@ -24,10 +37,5 @@
panic(err)
}
- resp, err := svc.Greet(context.Background(), &greet.GreetRequest{Name: "dubbo"})
- if err != nil {
- panic(err)
- }
-
- fmt.Println(resp.Greeting)
+ common.TestClient(svc)
}
diff --git a/protocol/triple/internal/server/cmd/main.go b/protocol/triple/internal/client/cmd_client_with_registry/main.go
similarity index 70%
copy from protocol/triple/internal/server/cmd/main.go
copy to protocol/triple/internal/client/cmd_client_with_registry/main.go
index 4081a18..4d2acb9 100644
--- a/protocol/triple/internal/server/cmd/main.go
+++ b/protocol/triple/internal/client/cmd_client_with_registry/main.go
@@ -18,19 +18,29 @@
package main
import (
+ "dubbo.apache.org/dubbo-go/v3/client"
_ "dubbo.apache.org/dubbo-go/v3/imports"
+ "dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/client/common"
"dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/proto/triple_gen/greettriple"
- "dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/server/api"
- "dubbo.apache.org/dubbo-go/v3/server"
+ "dubbo.apache.org/dubbo-go/v3/registry"
)
func main() {
- srv, err := server.NewServer()
+ // for the most brief RPC case with Registry
+
+ cli, err := client.NewClient(
+ client.WithRegistry("zk",
+ registry.WithZookeeper(),
+ registry.WithAddress("127.0.0.1:2181"),
+ ),
+ )
if err != nil {
panic(err)
}
- if err := greettriple.RegisterGreetServiceHandler(srv, &api.GreetTripleServer{}); err != nil {
+ svc, err := greettriple.NewGreetService(cli)
+ if err != nil {
panic(err)
}
- select {}
+
+ common.TestClient(svc)
}
diff --git a/protocol/triple/internal/client/cmd_instance/main.go b/protocol/triple/internal/client/cmd_instance/main.go
index e1c17aa..262ca49 100644
--- a/protocol/triple/internal/client/cmd_instance/main.go
+++ b/protocol/triple/internal/client/cmd_instance/main.go
@@ -1,15 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package main
import (
- "context"
- "fmt"
-)
-
-import (
- dubbo "dubbo.apache.org/dubbo-go/v3"
+ "dubbo.apache.org/dubbo-go/v3"
"dubbo.apache.org/dubbo-go/v3/client"
- "dubbo.apache.org/dubbo-go/v3/global"
- greet "dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/proto"
+ _ "dubbo.apache.org/dubbo-go/v3/imports"
+ "dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/client/common"
"dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/proto/triple_gen/greettriple"
)
@@ -17,22 +29,14 @@
// global conception
// configure global configurations and common modules
ins, err := dubbo.NewInstance(
- dubbo.WithApplication(
- global.WithApplication_Name("dubbo_test"),
- ),
- dubbo.WithRegistry("nacos",
- global.WithRegistry_Address("127.0.0.1:8848"),
- ),
- dubbo.WithMetric(
- global.WithMetric_Enable(true),
- ),
+ dubbo.WithName("dubbo_test"),
)
if err != nil {
panic(err)
}
// configure the params that only client layer cares
cli, err := ins.NewClient(
- client.WithRetries(3),
+ client.WithURL("tri://127.0.0.1:20000"),
)
if err != nil {
panic(err)
@@ -43,10 +47,5 @@
panic(err)
}
- resp, err := svc.Greet(context.Background(), &greet.GreetRequest{Name: "dubbo"})
- if err != nil {
- panic(err)
- }
-
- fmt.Println(resp.Greeting)
+ common.TestClient(svc)
}
diff --git a/protocol/triple/internal/server/cmd/main.go b/protocol/triple/internal/client/cmd_instance_with_registry/main.go
similarity index 60%
copy from protocol/triple/internal/server/cmd/main.go
copy to protocol/triple/internal/client/cmd_instance_with_registry/main.go
index 4081a18..38f9d3a 100644
--- a/protocol/triple/internal/server/cmd/main.go
+++ b/protocol/triple/internal/client/cmd_instance_with_registry/main.go
@@ -18,19 +18,39 @@
package main
import (
+ "dubbo.apache.org/dubbo-go/v3"
+ "dubbo.apache.org/dubbo-go/v3/client"
_ "dubbo.apache.org/dubbo-go/v3/imports"
+ "dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/client/common"
"dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/proto/triple_gen/greettriple"
- "dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/server/api"
- "dubbo.apache.org/dubbo-go/v3/server"
+ "dubbo.apache.org/dubbo-go/v3/registry"
)
func main() {
- srv, err := server.NewServer()
+ // global conception
+ // configure global configurations and common modules
+ ins, err := dubbo.NewInstance(
+ dubbo.WithName("dubbo_test"),
+ dubbo.WithRegistry("zk",
+ registry.WithZookeeper(),
+ registry.WithAddress("127.0.0.1:2181"),
+ ),
+ )
if err != nil {
panic(err)
}
- if err := greettriple.RegisterGreetServiceHandler(srv, &api.GreetTripleServer{}); err != nil {
+ // configure the params that only client layer cares
+ cli, err := ins.NewClient(
+ client.WithRegistryIDs([]string{"zk"}),
+ )
+ if err != nil {
panic(err)
}
- select {}
+
+ svc, err := greettriple.NewGreetService(cli)
+ if err != nil {
+ panic(err)
+ }
+
+ common.TestClient(svc)
}
diff --git a/protocol/triple/internal/client/common/client.go b/protocol/triple/internal/client/common/client.go
new file mode 100644
index 0000000..af7f257
--- /dev/null
+++ b/protocol/triple/internal/client/common/client.go
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package common
+
+import (
+ "context"
+)
+
+import (
+ "github.com/dubbogo/gost/log/logger"
+)
+
+import (
+ greet "dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/proto"
+ "dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/proto/triple_gen/greettriple"
+)
+
+func TestClient(cli greettriple.GreetService) {
+ if err := testUnary(cli); err != nil {
+ logger.Error(err)
+ }
+
+ if err := testBidiStream(cli); err != nil {
+ logger.Error(err)
+ }
+
+ if err := testClientStream(cli); err != nil {
+ logger.Error(err)
+ }
+
+ if err := testServerStream(cli); err != nil {
+ logger.Error(err)
+ }
+}
+
+func testUnary(cli greettriple.GreetService) error {
+ logger.Info("start to test TRIPLE unary call")
+ resp, err := cli.Greet(context.Background(), &greet.GreetRequest{Name: "triple"})
+ if err != nil {
+ return err
+ }
+ logger.Infof("TRIPLE unary call resp: %s", resp.Greeting)
+ return nil
+}
+
+func testBidiStream(cli greettriple.GreetService) error {
+ logger.Info("start to test TRIPLE bidi stream")
+ stream, err := cli.GreetStream(context.Background())
+ if err != nil {
+ return err
+ }
+ if err := stream.Send(&greet.GreetStreamRequest{Name: "triple"}); err != nil {
+ return err
+ }
+ resp, err := stream.Recv()
+ if err != nil {
+ return err
+ }
+ logger.Infof("TRIPLE bidi stream resp: %s", resp.Greeting)
+ if err := stream.CloseRequest(); err != nil {
+ return err
+ }
+ if err := stream.CloseResponse(); err != nil {
+ return err
+ }
+ return nil
+}
+
+func testClientStream(cli greettriple.GreetService) error {
+ logger.Info("start to test TRIPLE client stream")
+ stream, err := cli.GreetClientStream(context.Background())
+ if err != nil {
+ return err
+ }
+ for i := 0; i < 5; i++ {
+ if err := stream.Send(&greet.GreetClientStreamRequest{Name: "triple"}); err != nil {
+ return err
+ }
+ }
+ resp, err := stream.CloseAndRecv()
+ if err != nil {
+ return err
+ }
+ logger.Infof("TRIPLE client stream resp: %s", resp.Greeting)
+ return nil
+}
+
+func testServerStream(cli greettriple.GreetService) error {
+ logger.Info("start to test TRIPLE server stream")
+ stream, err := cli.GreetServerStream(context.Background(), &greet.GreetServerStreamRequest{Name: "triple"})
+ if err != nil {
+ return err
+ }
+ for stream.Recv() {
+ logger.Infof("TRIPLE server stream resp: %s", stream.Msg().Greeting)
+ }
+ if stream.Err() != nil {
+ return err
+ }
+ if err := stream.Close(); err != nil {
+ return err
+ }
+ return nil
+}
diff --git a/protocol/triple/internal/dubbo3_server/cmd/dubbogo.yml b/protocol/triple/internal/dubbo3_server/cmd/dubbogo.yml
new file mode 100644
index 0000000..a3a562f
--- /dev/null
+++ b/protocol/triple/internal/dubbo3_server/cmd/dubbogo.yml
@@ -0,0 +1,13 @@
+dubbo:
+# registries:
+# zk:
+# address: zookeeper://127.0.0.1:2181
+ protocols:
+ triple:
+ name: tri
+ port: 20001
+ provider:
+ services:
+ GreetDubbo3Server:
+ # interface is for registry
+ interface: greet.GreetService
\ No newline at end of file
diff --git a/protocol/triple/internal/dubbo3_server/cmd/main.go b/protocol/triple/internal/dubbo3_server/cmd/main.go
new file mode 100644
index 0000000..aac22da
--- /dev/null
+++ b/protocol/triple/internal/dubbo3_server/cmd/main.go
@@ -0,0 +1,15 @@
+package main
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/config"
+ _ "dubbo.apache.org/dubbo-go/v3/imports"
+ "dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/dubbo3_server/api"
+)
+
+func main() {
+ config.SetProviderService(&api.GreetDubbo3Server{})
+ if err := config.Load(config.WithPath("./dubbogo.yml")); err != nil {
+ panic(err)
+ }
+ select {}
+}
diff --git a/protocol/triple/internal/proto/triple_gen/greettriple/greet.triple.go b/protocol/triple/internal/proto/triple_gen/greettriple/greet.triple.go
index 066f6c9..69f1046 100644
--- a/protocol/triple/internal/proto/triple_gen/greettriple/greet.triple.go
+++ b/protocol/triple/internal/proto/triple_gen/greettriple/greet.triple.go
@@ -87,10 +87,8 @@
}
func (c *GreetServiceImpl) Greet(ctx context.Context, req *proto.GreetRequest, opts ...client.CallOption) (*proto.GreetResponse, error) {
- triReq := triple_protocol.NewRequest(req)
resp := new(proto.GreetResponse)
- triResp := triple_protocol.NewResponse(resp)
- if err := c.cli.CallUnary(ctx, triReq, triResp, "greet.GreetService", "Greet", opts...); err != nil {
+ if err := c.cli.CallUnary(ctx, req, resp, "greet.GreetService", "Greet", opts...); err != nil {
return nil, err
}
return resp, nil
@@ -115,8 +113,7 @@
}
func (c *GreetServiceImpl) GreetServerStream(ctx context.Context, req *proto.GreetServerStreamRequest, opts ...client.CallOption) (GreetService_GreetServerStreamClient, error) {
- triReq := triple_protocol.NewRequest(req)
- stream, err := c.cli.CallServerStream(ctx, triReq, "greet.GreetService", "GreetServerStream", opts...)
+ stream, err := c.cli.CallServerStream(ctx, req, "greet.GreetService", "GreetServerStream", opts...)
if err != nil {
return nil, err
}
@@ -230,8 +227,8 @@
GreetServerStream(context.Context, *proto.GreetServerStreamRequest, GreetService_GreetServerStreamServer) error
}
-func RegisterGreetServiceHandler(srv *server.Server, hdlr GreetServiceHandler) error {
- return srv.Register(hdlr, &GreetService_ServiceInfo)
+func RegisterGreetServiceHandler(srv *server.Server, hdlr GreetServiceHandler, opts ...server.ServiceOption) error {
+ return srv.Register(hdlr, &GreetService_ServiceInfo, opts...)
}
type GreetService_GreetStreamServer interface {
diff --git a/protocol/triple/internal/server/cmd/dubbogo.yml b/protocol/triple/internal/server/cmd/dubbogo.yml
deleted file mode 100644
index 75d1986..0000000
--- a/protocol/triple/internal/server/cmd/dubbogo.yml
+++ /dev/null
@@ -1,16 +0,0 @@
-dubbo:
- registries:
- demoZK:
- protocol: zookeeper
- address: 127.0.0.1:2181
- protocols:
- grpc_new:
- name: grpc_new
- port: 20000
- provider:
- services:
- greet.GreetService:
- interface: greet.GreetService
- proxy: default
- protocol-ids:
- - grpc_new
\ No newline at end of file
diff --git a/protocol/triple/internal/server/cmd/main.go b/protocol/triple/internal/server/cmd_instance/main.go
similarity index 75%
copy from protocol/triple/internal/server/cmd/main.go
copy to protocol/triple/internal/server/cmd_instance/main.go
index 4081a18..4c0006a 100644
--- a/protocol/triple/internal/server/cmd/main.go
+++ b/protocol/triple/internal/server/cmd_instance/main.go
@@ -18,19 +18,31 @@
package main
import (
+ "dubbo.apache.org/dubbo-go/v3"
_ "dubbo.apache.org/dubbo-go/v3/imports"
+ "dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/proto/triple_gen/greettriple"
"dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/server/api"
- "dubbo.apache.org/dubbo-go/v3/server"
)
func main() {
- srv, err := server.NewServer()
+ // global conception
+ // configure global configurations and common modules
+ ins, err := dubbo.NewInstance(
+ dubbo.WithName("dubbo_test"),
+ dubbo.WithProtocol("tri",
+ protocol.WithTriple(),
+ protocol.WithPort(20000),
+ ),
+ )
+ srv, err := ins.NewServer()
if err != nil {
panic(err)
}
if err := greettriple.RegisterGreetServiceHandler(srv, &api.GreetTripleServer{}); err != nil {
panic(err)
}
- select {}
+ if err := srv.Serve(); err != nil {
+ panic(err)
+ }
}
diff --git a/protocol/triple/internal/server/cmd/main.go b/protocol/triple/internal/server/cmd_instance_with_registry/main.go
similarity index 68%
copy from protocol/triple/internal/server/cmd/main.go
copy to protocol/triple/internal/server/cmd_instance_with_registry/main.go
index 4081a18..070421c 100644
--- a/protocol/triple/internal/server/cmd/main.go
+++ b/protocol/triple/internal/server/cmd_instance_with_registry/main.go
@@ -18,19 +18,36 @@
package main
import (
+ "dubbo.apache.org/dubbo-go/v3"
_ "dubbo.apache.org/dubbo-go/v3/imports"
+ "dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/proto/triple_gen/greettriple"
"dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/server/api"
- "dubbo.apache.org/dubbo-go/v3/server"
+ "dubbo.apache.org/dubbo-go/v3/registry"
)
func main() {
- srv, err := server.NewServer()
+ // global conception
+ // configure global configurations and common modules
+ ins, err := dubbo.NewInstance(
+ dubbo.WithName("dubbo_test"),
+ dubbo.WithRegistry("zk",
+ registry.WithZookeeper(),
+ registry.WithAddress("127.0.0.1:2181"),
+ ),
+ dubbo.WithProtocol("tri",
+ protocol.WithTriple(),
+ protocol.WithPort(20000),
+ ),
+ )
+ srv, err := ins.NewServer()
if err != nil {
panic(err)
}
if err := greettriple.RegisterGreetServiceHandler(srv, &api.GreetTripleServer{}); err != nil {
panic(err)
}
- select {}
+ if err := srv.Serve(); err != nil {
+ panic(err)
+ }
}
diff --git a/protocol/triple/internal/server/cmd/main.go b/protocol/triple/internal/server/cmd_server/main.go
similarity index 84%
rename from protocol/triple/internal/server/cmd/main.go
rename to protocol/triple/internal/server/cmd_server/main.go
index 4081a18..d7c5e5b 100644
--- a/protocol/triple/internal/server/cmd/main.go
+++ b/protocol/triple/internal/server/cmd_server/main.go
@@ -19,18 +19,26 @@
import (
_ "dubbo.apache.org/dubbo-go/v3/imports"
+ "dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/proto/triple_gen/greettriple"
"dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/server/api"
"dubbo.apache.org/dubbo-go/v3/server"
)
func main() {
- srv, err := server.NewServer()
+ srv, err := server.NewServer(
+ server.WithServer_Protocol("tri",
+ protocol.WithTriple(),
+ protocol.WithPort(20000),
+ ),
+ )
if err != nil {
panic(err)
}
if err := greettriple.RegisterGreetServiceHandler(srv, &api.GreetTripleServer{}); err != nil {
panic(err)
}
- select {}
+ if err := srv.Serve(); err != nil {
+ panic(err)
+ }
}
diff --git a/protocol/triple/internal/server/cmd/main.go b/protocol/triple/internal/server/cmd_server_with_registry/main.go
similarity index 76%
copy from protocol/triple/internal/server/cmd/main.go
copy to protocol/triple/internal/server/cmd_server_with_registry/main.go
index 4081a18..de2cc46 100644
--- a/protocol/triple/internal/server/cmd/main.go
+++ b/protocol/triple/internal/server/cmd_server_with_registry/main.go
@@ -19,18 +19,31 @@
import (
_ "dubbo.apache.org/dubbo-go/v3/imports"
+ "dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/proto/triple_gen/greettriple"
"dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/server/api"
+ "dubbo.apache.org/dubbo-go/v3/registry"
"dubbo.apache.org/dubbo-go/v3/server"
)
func main() {
- srv, err := server.NewServer()
+ srv, err := server.NewServer(
+ server.WithServer_Registry("zk",
+ registry.WithZookeeper(),
+ registry.WithAddress("127.0.0.1:2181"),
+ ),
+ server.WithServer_Protocol("tri",
+ protocol.WithTriple(),
+ protocol.WithPort(20000),
+ ),
+ )
if err != nil {
panic(err)
}
if err := greettriple.RegisterGreetServiceHandler(srv, &api.GreetTripleServer{}); err != nil {
panic(err)
}
- select {}
+ if err := srv.Serve(); err != nil {
+ panic(err)
+ }
}
diff --git a/protocol/triple/server.go b/protocol/triple/server.go
index e8219cf..64ece57 100644
--- a/protocol/triple/server.go
+++ b/protocol/triple/server.go
@@ -82,6 +82,14 @@
}
hanOpts = append(hanOpts, tri.WithSendMaxBytes(maxServerSendMsgSize))
+ serialization := URL.GetParam(constant.SerializationKey, constant.ProtobufSerialization)
+ switch serialization {
+ case constant.ProtobufSerialization:
+ case constant.JSONSerialization:
+ default:
+ panic(fmt.Sprintf("Unsupported serialization: %s", serialization))
+ }
+
// todo: implement interceptor
// If global trace instance was set, then server tracer instance
// can be get. If not, will return NoopTracer.
@@ -93,20 +101,21 @@
// grpc.MaxSendMsgSize(maxServerSendMsgSize),
//)
var cfg *tls.Config
- tlsConfig := config.GetRootConfig().TLSConfig
- if tlsConfig != nil {
- cfg, err = config.GetServerTlsConfig(&config.TLSConfig{
- CACertFile: tlsConfig.CACertFile,
- TLSCertFile: tlsConfig.TLSCertFile,
- TLSKeyFile: tlsConfig.TLSKeyFile,
- TLSServerName: tlsConfig.TLSServerName,
- })
- if err != nil {
- return
- }
- logger.Infof("Triple Server initialized the TLSConfig configuration")
- }
- srv.TLSConfig = cfg
+ // todo(DMwangnima): think about a more elegant way to configure tls
+ //tlsConfig := config.GetRootConfig().TLSConfig
+ //if tlsConfig != nil {
+ // cfg, err = config.GetServerTlsConfig(&config.TLSConfig{
+ // CACertFile: tlsConfig.CACertFile,
+ // TLSCertFile: tlsConfig.TLSCertFile,
+ // TLSKeyFile: tlsConfig.TLSKeyFile,
+ // TLSServerName: tlsConfig.TLSServerName,
+ // })
+ // if err != nil {
+ // return
+ // }
+ // logger.Infof("Triple Server initialized the TLSConfig configuration")
+ //}
+ //srv.TLSConfig = cfg
// todo:// open tracing
hanOpts = append(hanOpts, tri.WithInterceptors())
@@ -186,7 +195,9 @@
serviceKey := common.ServiceKey(providerService.Interface, providerService.Group, providerService.Version)
exporter, _ := tripleProtocol.ExporterMap().Load(serviceKey)
if exporter == nil {
- panic(fmt.Sprintf("no exporter found for servicekey: %v", serviceKey))
+ // todo(DMwangnima): handler reflection Service and health Service
+ continue
+ //panic(fmt.Sprintf("no exporter found for servicekey: %v", serviceKey))
}
invoker := exporter.(protocol.Exporter).GetInvoker()
if invoker == nil {
diff --git a/protocol/triple/triple-tool/gen/generator/genTriple.go b/protocol/triple/triple-tool/gen/generator/genTriple.go
index 0a0644e..246549b 100644
--- a/protocol/triple/triple-tool/gen/generator/genTriple.go
+++ b/protocol/triple/triple-tool/gen/generator/genTriple.go
@@ -25,6 +25,7 @@
import (
"github.com/emicklei/proto"
+
"github.com/golang/protobuf/protoc-gen-go/descriptor"
)
diff --git a/protocol/triple/triple.go b/protocol/triple/triple.go
index 4ec0742..cb64aa4 100644
--- a/protocol/triple/triple.go
+++ b/protocol/triple/triple.go
@@ -27,6 +27,7 @@
import (
"dubbo.apache.org/dubbo-go/v3/common"
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/server"
@@ -34,7 +35,7 @@
const (
// TRIPLE protocol name
- TRIPLE = "triple"
+ TRIPLE = "tri"
)
var (
@@ -56,9 +57,10 @@
url := invoker.GetURL()
serviceKey := url.ServiceKey()
// todo: retrieve this info from url
- info := &server.ServiceInfo{
- InterfaceName: url.Path,
- Methods: url.MethodInfo,
+ var info *server.ServiceInfo
+ infoRaw, ok := url.Attributes[constant.ServiceInfoKey]
+ if ok {
+ info = infoRaw.(*server.ServiceInfo)
}
exporter := NewTripleExporter(serviceKey, invoker, tp.ExporterMap())
tp.SetExporterMap(serviceKey, exporter)
diff --git a/registry/exposed_tmp/exposed.go b/registry/exposed_tmp/exposed.go
new file mode 100644
index 0000000..f6861e0
--- /dev/null
+++ b/registry/exposed_tmp/exposed.go
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package exposed_tmp
+
+import (
+ "reflect"
+ "strconv"
+)
+
+import (
+ "github.com/dubbogo/gost/log/logger"
+
+ perrors "github.com/pkg/errors"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common"
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/common/extension"
+ "dubbo.apache.org/dubbo-go/v3/registry"
+)
+
+// RegisterServiceInstance register service instance
+func RegisterServiceInstance(applicationName string, tag string, metadataType string) {
+ url := selectMetadataServiceExportedURL()
+ if url == nil {
+ return
+ }
+ instance, err := createInstance(url, applicationName, tag, metadataType)
+ if err != nil {
+ panic(err)
+ }
+ p := extension.GetProtocol(constant.RegistryProtocol)
+ var rp registry.RegistryFactory
+ var ok bool
+ if rp, ok = p.(registry.RegistryFactory); !ok {
+ panic("dubbo registry protocol{" + reflect.TypeOf(p).String() + "} is invalid")
+ }
+ rs := rp.GetRegistries()
+ for _, r := range rs {
+ var sdr registry.ServiceDiscoveryHolder
+ if sdr, ok = r.(registry.ServiceDiscoveryHolder); !ok {
+ continue
+ }
+ // publish app level data to registry
+ logger.Infof("Starting register instance address %v", instance)
+ err := sdr.GetServiceDiscovery().Register(instance)
+ if err != nil {
+ panic(err)
+ }
+ }
+ // publish metadata to remote
+ if metadataType == constant.RemoteMetadataStorageType {
+ if remoteMetadataService, err := extension.GetRemoteMetadataService(); err == nil && remoteMetadataService != nil {
+ remoteMetadataService.PublishMetadata(applicationName)
+ }
+ }
+}
+
+// // nolint
+func createInstance(url *common.URL, applicationName string, tag string, metadataType string) (registry.ServiceInstance, error) {
+ port, err := strconv.ParseInt(url.Port, 10, 32)
+ if err != nil {
+ return nil, perrors.WithMessage(err, "invalid port: "+url.Port)
+ }
+
+ host := url.Ip
+ if len(host) == 0 {
+ host = common.GetLocalIp()
+ }
+
+ // usually we will add more metadata
+ metadata := make(map[string]string, 8)
+ metadata[constant.MetadataStorageTypePropertyName] = metadataType
+
+ instance := ®istry.DefaultServiceInstance{
+ ServiceName: applicationName,
+ Host: host,
+ Port: int(port),
+ ID: host + constant.KeySeparator + url.Port,
+ Enable: true,
+ Healthy: true,
+ Metadata: metadata,
+ Tag: tag,
+ }
+
+ for _, cus := range extension.GetCustomizers() {
+ cus.Customize(instance)
+ }
+
+ return instance, nil
+}
+
+// selectMetadataServiceExportedURL get already be exported url
+func selectMetadataServiceExportedURL() *common.URL {
+ var selectedUrl *common.URL
+ metaDataService, err := extension.GetLocalMetadataService(constant.DefaultKey)
+ if err != nil {
+ logger.Warnf("get metadata service exporter failed, pls check if you import _ \"dubbo.apache.org/dubbo-go/v3/metadata/service/local\"")
+ return nil
+ }
+ urlList, err := metaDataService.GetExportedURLs(constant.AnyValue, constant.AnyValue, constant.AnyValue, constant.AnyValue)
+ if err != nil {
+ panic(err)
+ }
+ if len(urlList) == 0 {
+ return nil
+ }
+ for _, url := range urlList {
+ selectedUrl = url
+ // rest first
+ if url.Protocol == "rest" {
+ break
+ }
+ }
+ return selectedUrl
+}
diff --git a/registry/options.go b/registry/options.go
new file mode 100644
index 0000000..af40112
--- /dev/null
+++ b/registry/options.go
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package registry
+
+import (
+ "time"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/global"
+)
+
+type Options struct {
+ Registry *global.RegistryConfig
+}
+
+func DefaultOptions() *Options {
+ return &Options{Registry: global.DefaultRegistryConfig()}
+}
+
+type Option func(*Options)
+
+func WithEtcdV3() Option {
+ return func(opts *Options) {
+ // todo(DMwangnima): move etcdv3 to constant
+ opts.Registry.Protocol = "etcdv3"
+ }
+}
+
+func WithNacos() Option {
+ return func(opts *Options) {
+ opts.Registry.Protocol = constant.NacosKey
+ }
+}
+
+func WithPolaris() Option {
+ return func(opts *Options) {
+ opts.Registry.Protocol = constant.PolarisKey
+ }
+}
+
+func WithXDS() Option {
+ return func(opts *Options) {
+ opts.Registry.Protocol = constant.XDSRegistryKey
+ }
+}
+
+func WithZookeeper() Option {
+ return func(opts *Options) {
+ // todo(DMwangnima): move zookeeper to constant
+ opts.Registry.Protocol = "zookeeper"
+ }
+}
+
+func WithTimeout(timeout time.Duration) Option {
+ return func(opts *Options) {
+ opts.Registry.Timeout = timeout.String()
+ }
+}
+
+func WithGroup(group string) Option {
+ return func(opts *Options) {
+ opts.Registry.Group = group
+ }
+}
+
+func WithNamespace(namespace string) Option {
+ return func(opts *Options) {
+ opts.Registry.Namespace = namespace
+ }
+}
+
+func WithTTL(ttl time.Duration) Option {
+ return func(opts *Options) {
+ opts.Registry.TTL = ttl.String()
+ }
+}
+
+func WithAddress(address string) Option {
+ return func(opts *Options) {
+ opts.Registry.Address = address
+ }
+}
+
+func WithUsername(name string) Option {
+ return func(opts *Options) {
+ opts.Registry.Username = name
+ }
+}
+
+func WithPassword(password string) Option {
+ return func(opts *Options) {
+ opts.Registry.Password = password
+ }
+}
+
+func WithSimplified() Option {
+ return func(opts *Options) {
+ opts.Registry.Simplified = true
+ }
+}
+
+func WithPreferred() Option {
+ return func(opts *Options) {
+ opts.Registry.Preferred = true
+ }
+}
+
+func WithZone(zone string) Option {
+ return func(opts *Options) {
+ opts.Registry.Zone = zone
+ }
+}
+
+func WithWeight(weight int64) Option {
+ return func(opts *Options) {
+ opts.Registry.Weight = weight
+ }
+}
+
+func WithParams(params map[string]string) Option {
+ return func(opts *Options) {
+ opts.Registry.Params = params
+ }
+}
+
+func WithRegisterServiceAndInterface() Option {
+ return func(opts *Options) {
+ opts.Registry.RegistryType = constant.RegistryTypeAll
+ }
+}
+
+func WithRegisterInterface() Option {
+ return func(opts *Options) {
+ opts.Registry.RegistryType = constant.RegistryTypeInterface
+ }
+}
+
+func WithoutUseAsMetaReport() Option {
+ return func(opts *Options) {
+ opts.Registry.UseAsMetaReport = false
+ }
+}
+
+func WithoutUseAsConfigCenter() Option {
+ return func(opts *Options) {
+ opts.Registry.UseAsConfigCenter = false
+ }
+}
diff --git a/server/action.go b/server/action.go
index 84f9e07..6f93321 100644
--- a/server/action.go
+++ b/server/action.go
@@ -19,7 +19,6 @@
import (
"container/list"
-
"fmt"
"net/url"
"os"
@@ -29,8 +28,6 @@
)
import (
- "github.com/creasty/defaults"
-
"github.com/dubbogo/gost/log/logger"
gxnet "github.com/dubbogo/gost/net"
@@ -41,58 +38,22 @@
import (
"dubbo.apache.org/dubbo-go/v3/common"
- commonCfg "dubbo.apache.org/dubbo-go/v3/common/config"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/config"
+ "dubbo.apache.org/dubbo-go/v3/global"
+ "dubbo.apache.org/dubbo-go/v3/graceful_shutdown"
+ "dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/protocolwrapper"
)
// Prefix returns dubbo.service.${InterfaceName}.
-func (s *ServerOptions) Prefix() string {
- return strings.Join([]string{constant.ServiceConfigPrefix, s.Id}, ".")
+func (svcOpts *ServiceOptions) Prefix() string {
+ return strings.Join([]string{constant.ServiceConfigPrefix, svcOpts.Id}, ".")
}
-func (s *ServerOptions) Init(opts ...ServerOption) error {
- for _, opt := range opts {
- opt(s)
- }
- if err := defaults.Set(s); err != nil {
- return err
- }
- for _, opt := range opts {
- opt(s)
- }
-
- srv := s.Server
-
- s.exported = atomic.NewBool(false)
-
- application := s.Application
- if application != nil {
- s.applicationCompat = compatApplicationConfig(application)
- if err := s.applicationCompat.Init(); err != nil {
- return err
- }
- s.metadataType = s.applicationCompat.MetadataType
- if srv.Group == "" {
- srv.Group = s.applicationCompat.Group
- }
- if srv.Version == "" {
- srv.Version = s.applicationCompat.Version
- }
- }
- s.unexported = atomic.NewBool(false)
- err := s.check()
- if err != nil {
- panic(err)
- }
- s.Export = true
- return commonCfg.Verify(s)
-}
-
-func (s *ServerOptions) check() error {
- srv := s.Server
+func (svcOpts *ServiceOptions) check() error {
+ srv := svcOpts.Service
// check if the limiter has been imported
if srv.TpsLimiter != "" {
_, err := extension.GetTpsLimiter(srv.TpsLimiter)
@@ -116,33 +77,33 @@
if srv.TpsLimitInterval != "" {
tpsLimitInterval, err := strconv.ParseInt(srv.TpsLimitInterval, 0, 0)
if err != nil {
- return fmt.Errorf("[ServiceConfig] Cannot parse the configuration tps.limit.interval for service %s, please check your configuration", srv.Interface)
+ return fmt.Errorf("[ServiceConfig] Cannot parse the configuration tps.limit.interval for service %svcOpts, please check your configuration", srv.Interface)
}
if tpsLimitInterval < 0 {
- return fmt.Errorf("[ServiceConfig] The configuration tps.limit.interval for service %s must be positive, please check your configuration", srv.Interface)
+ return fmt.Errorf("[ServiceConfig] The configuration tps.limit.interval for service %svcOpts must be positive, please check your configuration", srv.Interface)
}
}
if srv.TpsLimitRate != "" {
tpsLimitRate, err := strconv.ParseInt(srv.TpsLimitRate, 0, 0)
if err != nil {
- return fmt.Errorf("[ServiceConfig] Cannot parse the configuration tps.limit.rate for service %s, please check your configuration", srv.Interface)
+ return fmt.Errorf("[ServiceConfig] Cannot parse the configuration tps.limit.rate for service %svcOpts, please check your configuration", srv.Interface)
}
if tpsLimitRate < 0 {
- return fmt.Errorf("[ServiceConfig] The configuration tps.limit.rate for service %s must be positive, please check your configuration", srv.Interface)
+ return fmt.Errorf("[ServiceConfig] The configuration tps.limit.rate for service %svcOpts must be positive, please check your configuration", srv.Interface)
}
}
return nil
}
// InitExported will set exported as false atom bool
-func (s *ServerOptions) InitExported() {
- s.exported = atomic.NewBool(false)
+func (svcOpts *ServiceOptions) InitExported() {
+ svcOpts.exported = atomic.NewBool(false)
}
// IsExport will return whether the service config is exported or not
-func (s *ServerOptions) IsExport() bool {
- return s.exported.Load()
+func (svcOpts *ServiceOptions) IsExport() bool {
+ return svcOpts.exported.Load()
}
// Get Random Port
@@ -163,66 +124,61 @@
return ports
}
-func (s *ServerOptions) ExportWithoutInfo() error {
- return s.export(nil)
+func (svcOpts *ServiceOptions) ExportWithoutInfo() error {
+ return svcOpts.export(nil)
}
-func (s *ServerOptions) ExportWithInfo(info *ServiceInfo) error {
- return s.export(info)
+func (svcOpts *ServiceOptions) ExportWithInfo(info *ServiceInfo) error {
+ return svcOpts.export(info)
}
-func (s *ServerOptions) export(info *ServiceInfo) error {
- srv := s.Server
+func (svcOpts *ServiceOptions) export(info *ServiceInfo) error {
+ srv := svcOpts.Service
- var methodInfos []MethodInfo
- var methods string
if info != nil {
srv.Interface = info.InterfaceName
- methodInfos = info.Methods
- s.Id = info.InterfaceName
- s.info = info
+ svcOpts.Id = info.InterfaceName
+ svcOpts.info = info
}
- // TODO: delay export
- if s.unexported != nil && s.unexported.Load() {
+ // TODO: delay needExport
+ if svcOpts.unexported != nil && svcOpts.unexported.Load() {
err := perrors.Errorf("The service %v has already unexported!", srv.Interface)
logger.Errorf(err.Error())
return err
}
- if s.exported != nil && s.exported.Load() {
+ if svcOpts.exported != nil && svcOpts.exported.Load() {
logger.Warnf("The service %v has already exported!", srv.Interface)
return nil
}
regUrls := make([]*common.URL, 0)
if !srv.NotRegister {
- regUrls = config.LoadRegistries(srv.RegistryIDs, s.registriesCompat, common.PROVIDER)
+ regUrls = config.LoadRegistries(srv.RegistryIDs, svcOpts.registriesCompat, common.PROVIDER)
}
- urlMap := s.getUrlMap()
- protocolConfigs := loadProtocol(srv.ProtocolIDs, s.protocolCompat)
+ urlMap := svcOpts.getUrlMap()
+ protocolConfigs := loadProtocol(srv.ProtocolIDs, svcOpts.protocolsCompat)
if len(protocolConfigs) == 0 {
- logger.Warnf("The service %v's '%v' protocols don't has right protocolConfigs, Please check your configuration center and transfer protocol ", srv.Interface, srv.ProtocolIDs)
+ logger.Warnf("The service %v'svcOpts '%v' protocols don't has right protocolConfigs, Please check your configuration center and transfer protocol ", srv.Interface, srv.ProtocolIDs)
return nil
}
+ var invoker protocol.Invoker
ports := getRandomPort(protocolConfigs)
nextPort := ports.Front()
- proxyFactory := extension.GetProxyFactory(s.ProxyFactoryKey)
+ proxyFactory := extension.GetProxyFactory(svcOpts.ProxyFactoryKey)
for _, proto := range protocolConfigs {
- if info != nil {
- for _, info := range methodInfos {
- methods += info.Name + ","
- }
- } else {
- // registry the service reflect
- var err error
- methods, err = common.ServiceMap.Register(srv.Interface, proto.Name, srv.Group, srv.Version, s.rpcService)
- if err != nil {
- formatErr := perrors.Errorf("The service %v export the protocol %v error! Error message is %v.",
- srv.Interface, proto.Name, err.Error())
- logger.Errorf(formatErr.Error())
- return formatErr
- }
+ // *important* Register should have been replaced by processing of ServiceInfo.
+ // but many modules like metadata need to make use of information from ServiceMap.
+ // todo(DMwangnimg): finish replacing procedure
+
+ // registry the service reflect
+ methods, err := common.ServiceMap.Register(srv.Interface, proto.Name, srv.Group, srv.Version, svcOpts.rpcService)
+ if err != nil {
+ formatErr := perrors.Errorf("The service %v needExport the protocol %v error! Error message is %v.",
+ srv.Interface, proto.Name, err.Error())
+ logger.Errorf(formatErr.Error())
+ return formatErr
}
port := proto.Port
@@ -236,12 +192,13 @@
common.WithIp(proto.Ip),
common.WithPort(port),
common.WithParams(urlMap),
- common.WithParamsValue(constant.BeanNameKey, s.Id),
+ common.WithParamsValue(constant.BeanNameKey, svcOpts.Id),
//common.WithParamsValue(constant.SslEnabledKey, strconv.FormatBool(config.GetSslEnabled())),
common.WithMethods(strings.Split(methods, ",")),
- common.WithMethodInfos(methodInfos),
+ // todo(DMwangnima): remove this
+ common.WithAttribute(constant.ServiceInfoKey, info),
common.WithToken(srv.Token),
- common.WithParamsValue(constant.MetadataTypeKey, s.metadataType),
+ common.WithParamsValue(constant.MetadataTypeKey, svcOpts.metadataType),
// fix https://github.com/apache/dubbo-go/issues/2176
common.WithParamsValue(constant.MaxServerSendMsgSize, proto.MaxServerSendMsgSize),
common.WithParamsValue(constant.MaxServerRecvMsgSize, proto.MaxServerRecvMsgSize),
@@ -251,50 +208,61 @@
}
// post process the URL to be exported
- s.postProcessConfig(ivkURL)
- // config post processor may set "export" to false
+ svcOpts.postProcessConfig(ivkURL)
+ // config post processor may set "needExport" to false
if !ivkURL.GetParamBool(constant.ExportKey, true) {
return nil
}
if len(regUrls) > 0 {
- s.cacheMutex.Lock()
- if s.cacheProtocol == nil {
+ svcOpts.cacheMutex.Lock()
+ if svcOpts.cacheProtocol == nil {
logger.Debugf(fmt.Sprintf("First load the registry protocol, url is {%v}!", ivkURL))
- s.cacheProtocol = extension.GetProtocol(constant.RegistryProtocol)
+ svcOpts.cacheProtocol = extension.GetProtocol(constant.RegistryProtocol)
}
- s.cacheMutex.Unlock()
+ svcOpts.cacheMutex.Unlock()
for _, regUrl := range regUrls {
setRegistrySubURL(ivkURL, regUrl)
- invoker := proxyFactory.GetInvoker(regUrl)
- exporter := s.cacheProtocol.Export(invoker)
+ if info == nil {
+ invoker = proxyFactory.GetInvoker(regUrl)
+ } else {
+ invoker = newInfoInvoker(regUrl, info, svcOpts.rpcService)
+ }
+ exporter := svcOpts.cacheProtocol.Export(invoker)
if exporter == nil {
return perrors.New(fmt.Sprintf("Registry protocol new exporter error, registry is {%v}, url is {%v}", regUrl, ivkURL))
}
- s.exporters = append(s.exporters, exporter)
+ svcOpts.exporters = append(svcOpts.exporters, exporter)
}
} else {
if ivkURL.GetParam(constant.InterfaceKey, "") == constant.MetadataServiceName {
ms, err := extension.GetLocalMetadataService("")
if err != nil {
- logger.Warnf("export org.apache.dubbo.metadata.MetadataService failed beacause of %s ! pls check if you import _ \"dubbo.apache.org/dubbo-go/v3/metadata/service/local\"", err)
+ logger.Warnf("needExport org.apache.dubbo.metadata.MetadataService failed beacause of %svcOpts ! pls check if you import _ \"dubbo.apache.org/dubbo-go/v3/metadata/service/local\"", err)
return nil
}
if err := ms.SetMetadataServiceURL(ivkURL); err != nil {
- logger.Warnf("SetMetadataServiceURL error = %s", err)
+ logger.Warnf("SetMetadataServiceURL error = %svcOpts", err)
}
}
- invoker := proxyFactory.GetInvoker(ivkURL)
+ if info == nil {
+ invoker = proxyFactory.GetInvoker(ivkURL)
+ } else {
+ invoker = newInfoInvoker(ivkURL, info, svcOpts.rpcService)
+ }
exporter := extension.GetProtocol(protocolwrapper.FILTER).Export(invoker)
if exporter == nil {
return perrors.New(fmt.Sprintf("Filter protocol without registry new exporter error, url is {%v}", ivkURL))
}
- s.exporters = append(s.exporters, exporter)
+ svcOpts.exporters = append(svcOpts.exporters, exporter)
}
publishServiceDefinition(ivkURL)
+ // this protocol would be destroyed in graceful_shutdown
+ // please refer to (https://github.com/apache/dubbo-go/issues/2429)
+ graceful_shutdown.RegisterProtocol(proto.Name)
}
- s.exported.Store(true)
+ svcOpts.exported.Store(true)
return nil
}
@@ -318,35 +286,35 @@
}
// Unexport will call unexport of all exporters service config exported
-func (s *ServerOptions) Unexport() {
- if !s.exported.Load() {
+func (svcOpts *ServiceOptions) Unexport() {
+ if !svcOpts.exported.Load() {
return
}
- if s.unexported.Load() {
+ if svcOpts.unexported.Load() {
return
}
func() {
- s.exportersLock.Lock()
- defer s.exportersLock.Unlock()
- for _, exporter := range s.exporters {
+ svcOpts.exportersLock.Lock()
+ defer svcOpts.exportersLock.Unlock()
+ for _, exporter := range svcOpts.exporters {
exporter.UnExport()
}
- s.exporters = nil
+ svcOpts.exporters = nil
}()
- s.exported.Store(false)
- s.unexported.Store(true)
+ svcOpts.exported.Store(false)
+ svcOpts.unexported.Store(true)
}
// Implement only store the @s and return
-func (s *ServerOptions) Implement(rpcService common.RPCService) {
- s.rpcService = rpcService
+func (svcOpts *ServiceOptions) Implement(rpcService common.RPCService) {
+ svcOpts.rpcService = rpcService
}
-func (s *ServerOptions) getUrlMap() url.Values {
- srv := s.Server
- app := s.applicationCompat
+func (svcOpts *ServiceOptions) getUrlMap() url.Values {
+ srv := svcOpts.Service
+ app := svcOpts.applicationCompat
urlMap := url.Values{}
// first set user params
@@ -379,15 +347,15 @@
urlMap.Set(constant.OwnerKey, app.Owner)
urlMap.Set(constant.EnvironmentKey, app.Environment)
- // filter
+ //filter
var filters string
if srv.Filter == "" {
filters = constant.DefaultServiceFilters
} else {
filters = srv.Filter
}
- if s.adaptiveService {
- filters += fmt.Sprintf(",%s", constant.AdaptiveServiceProviderFilterKey)
+ if svcOpts.adaptiveService {
+ filters += fmt.Sprintf(",%svcOpts", constant.AdaptiveServiceProviderFilterKey)
}
urlMap.Set(constant.ServiceFilterKey, filters)
@@ -409,8 +377,8 @@
urlMap.Set(constant.ServiceAuthKey, srv.Auth)
urlMap.Set(constant.ParameterSignatureEnableKey, srv.ParamSign)
- // whether to export or not
- urlMap.Set(constant.ExportKey, strconv.FormatBool(s.Export))
+ // whether to needExport or not
+ urlMap.Set(constant.ExportKey, strconv.FormatBool(svcOpts.needExport))
urlMap.Set(constant.PIDKey, fmt.Sprintf("%d", os.Getpid()))
for _, v := range srv.Methods {
@@ -431,10 +399,10 @@
}
// GetExportedUrls will return the url in service config's exporter
-func (s *ServerOptions) GetExportedUrls() []*common.URL {
- if s.exported.Load() {
+func (svcOpts *ServiceOptions) GetExportedUrls() []*common.URL {
+ if svcOpts.exported.Load() {
var urls []*common.URL
- for _, exporter := range s.exporters {
+ for _, exporter := range svcOpts.exporters {
urls = append(urls, exporter.GetInvoker().GetURL())
}
return urls
@@ -443,7 +411,7 @@
}
// postProcessConfig asks registered ConfigPostProcessor to post-process the current ServiceConfig.
-func (s *ServerOptions) postProcessConfig(url *common.URL) {
+func (svcOpts *ServiceOptions) postProcessConfig(url *common.URL) {
for _, p := range extension.GetConfigPostProcessors() {
p.PostProcessServiceConfig(url)
}
@@ -463,3 +431,25 @@
remoteMetadataService.PublishServiceDefinition(url)
}
}
+
+// todo(DMwangnima): think about moving this function to a common place(e.g. /common/config)
+func getRegistryIds(registries map[string]*global.RegistryConfig) []string {
+ ids := make([]string, 0)
+ for key := range registries {
+ ids = append(ids, key)
+ }
+ return removeDuplicateElement(ids)
+}
+
+// removeDuplicateElement remove duplicate element
+func removeDuplicateElement(items []string) []string {
+ result := make([]string, 0, len(items))
+ temp := map[string]struct{}{}
+ for _, item := range items {
+ if _, ok := temp[item]; !ok && item != "" {
+ temp[item] = struct{}{}
+ result = append(result, item)
+ }
+ }
+ return result
+}
diff --git a/server/compat.go b/server/compat.go
index f22c8cc..9e34254 100644
--- a/server/compat.go
+++ b/server/compat.go
@@ -77,3 +77,14 @@
RequestTimeout: c.RequestTimeout,
}
}
+
+func compatProtocolConfig(c *global.ProtocolConfig) *config.ProtocolConfig {
+ return &config.ProtocolConfig{
+ Name: c.Name,
+ Ip: c.Ip,
+ Port: c.Port,
+ Params: c.Params,
+ MaxServerSendMsgSize: c.MaxServerSendMsgSize,
+ MaxServerRecvMsgSize: c.MaxServerRecvMsgSize,
+ }
+}
diff --git a/server/options.go b/server/options.go
index 36b75a8..6dff299 100644
--- a/server/options.go
+++ b/server/options.go
@@ -18,24 +18,221 @@
package server
import (
- "dubbo.apache.org/dubbo-go/v3/common"
- "dubbo.apache.org/dubbo-go/v3/config"
- "dubbo.apache.org/dubbo-go/v3/global"
- "dubbo.apache.org/dubbo-go/v3/protocol"
- "go.uber.org/atomic"
+ "strconv"
"sync"
+ "time"
+)
+
+import (
+ "github.com/creasty/defaults"
+
+ "github.com/dubbogo/gost/log/logger"
+
+ perrors "github.com/pkg/errors"
+
+ "go.uber.org/atomic"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common"
+ commonCfg "dubbo.apache.org/dubbo-go/v3/common/config"
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/config"
+ aslimiter "dubbo.apache.org/dubbo-go/v3/filter/adaptivesvc/limiter"
+ "dubbo.apache.org/dubbo-go/v3/global"
+ "dubbo.apache.org/dubbo-go/v3/graceful_shutdown"
+ "dubbo.apache.org/dubbo-go/v3/protocol"
+ "dubbo.apache.org/dubbo-go/v3/registry"
)
type ServerOptions struct {
+ Provider *global.ProviderConfig
+ Application *global.ApplicationConfig
+ Registries map[string]*global.RegistryConfig
+ Protocols map[string]*global.ProtocolConfig
+ Tracings map[string]*global.TracingConfig
+ Shutdown *global.ShutdownConfig
+
+ providerCompat *config.ProviderConfig
+}
+
+func defaultServerOptions() *ServerOptions {
+ return &ServerOptions{
+ Application: global.DefaultApplicationConfig(),
+ Provider: global.DefaultProviderConfig(),
+ Shutdown: global.DefaultShutdownConfig(),
+ }
+}
+
+// todo(DMwangnima): think about the timing to initialize Registry, Protocol, Tracing
+func (srvOpts *ServerOptions) init(opts ...ServerOption) error {
+ for _, opt := range opts {
+ opt(srvOpts)
+ }
+ if err := defaults.Set(srvOpts); err != nil {
+ return err
+ }
+
+ prov := srvOpts.Provider
+
+ prov.RegistryIDs = commonCfg.TranslateIds(prov.RegistryIDs)
+ if len(prov.RegistryIDs) <= 0 {
+ prov.RegistryIDs = getRegistryIds(srvOpts.Registries)
+ }
+
+ prov.ProtocolIDs = commonCfg.TranslateIds(prov.ProtocolIDs)
+
+ if prov.TracingKey == "" && len(srvOpts.Tracings) > 0 {
+ for key := range srvOpts.Tracings {
+ prov.TracingKey = key
+ break
+ }
+ }
+
+ if err := commonCfg.Verify(prov); err != nil {
+ return err
+ }
+
+ // enable adaptive service verbose
+ if prov.AdaptiveServiceVerbose {
+ if !prov.AdaptiveService {
+ return perrors.Errorf("The adaptive service is disabled, " +
+ "adaptive service verbose should be disabled either.")
+ }
+ logger.Infof("adaptive service verbose is enabled.")
+ logger.Debugf("debug-level info could be shown.")
+ aslimiter.Verbose = true
+ }
+
+ // init graceful_shutdown
+ graceful_shutdown.Init(graceful_shutdown.WithShutdown_Config(srvOpts.Shutdown))
+
+ return nil
+}
+
+type ServerOption func(*ServerOptions)
+
+// ---------- For user ----------
+
+// todo(DMwangnima): change Filter Option like Cluster and LoadBalance
+func WithServer_Filter(filter string) ServerOption {
+ return func(opts *ServerOptions) {
+ opts.Provider.Filter = filter
+ }
+}
+
+// todo(DMwangnima): think about a more ideal configuration style
+func WithServer_RegistryIDs(registryIDs []string) ServerOption {
+ return func(opts *ServerOptions) {
+ opts.Provider.RegistryIDs = registryIDs
+ }
+}
+
+func WithServer_Registry(key string, opts ...registry.Option) ServerOption {
+ regOpts := registry.DefaultOptions()
+ for _, opt := range opts {
+ opt(regOpts)
+ }
+
+ return func(srvOpts *ServerOptions) {
+ if srvOpts.Registries == nil {
+ srvOpts.Registries = make(map[string]*global.RegistryConfig)
+ }
+ srvOpts.Registries[key] = regOpts.Registry
+ }
+}
+
+// todo(DMwangnima): think about a more ideal configuration style
+func WithServer_ProtocolIDs(protocolIDs []string) ServerOption {
+ return func(opts *ServerOptions) {
+ opts.Provider.ProtocolIDs = protocolIDs
+ }
+}
+
+func WithServer_Protocol(key string, opts ...protocol.Option) ServerOption {
+ proOpts := protocol.DefaultOptions()
+ for _, opt := range opts {
+ opt(proOpts)
+ }
+
+ return func(srvOpts *ServerOptions) {
+ if srvOpts.Protocols == nil {
+ srvOpts.Protocols = make(map[string]*global.ProtocolConfig)
+ }
+ srvOpts.Protocols[key] = proOpts.Protocol
+ }
+}
+
+func WithServer_TracingKey(tracingKey string) ServerOption {
+ return func(opts *ServerOptions) {
+ opts.Provider.TracingKey = tracingKey
+ }
+}
+
+// todo(DMwangnima): this configuration would be used by filter/hystrix
+// think about a more ideal way to configure
+func WithServer_FilterConf(conf interface{}) ServerOption {
+ return func(opts *ServerOptions) {
+ opts.Provider.FilterConf = conf
+ }
+}
+
+func WithServer_AdaptiveService() ServerOption {
+ return func(opts *ServerOptions) {
+ opts.Provider.AdaptiveService = true
+ }
+}
+
+func WithServer_AdaptiveServiceVerbose() ServerOption {
+ return func(opts *ServerOptions) {
+ opts.Provider.AdaptiveServiceVerbose = true
+ }
+}
+
+// ========== For framework ==========
+// These functions should not be invoked by users
+
+func SetServer_Application(application *global.ApplicationConfig) ServerOption {
+ return func(opts *ServerOptions) {
+ opts.Application = application
+ }
+}
+
+func SetServer_Registries(regs map[string]*global.RegistryConfig) ServerOption {
+ return func(opts *ServerOptions) {
+ opts.Registries = regs
+ }
+}
+
+func SetServer_Protocols(pros map[string]*global.ProtocolConfig) ServerOption {
+ return func(opts *ServerOptions) {
+ opts.Protocols = pros
+ }
+}
+
+func SetServer_Tracings(tras map[string]*global.TracingConfig) ServerOption {
+ return func(opts *ServerOptions) {
+ opts.Tracings = tras
+ }
+}
+
+func SetServer_Shutdown(shutdown *global.ShutdownConfig) ServerOption {
+ return func(opts *ServerOptions) {
+ opts.Shutdown = shutdown
+ }
+}
+
+type ServiceOptions struct {
Application *global.ApplicationConfig
Provider *global.ProviderConfig
- Server *global.ServiceConfig
+ Service *global.ServiceConfig
Registries map[string]*global.RegistryConfig
+ Protocols map[string]*global.ProtocolConfig
Id string
unexported *atomic.Bool
exported *atomic.Bool
- Export bool
+ needExport bool
metadataType string
info *ServiceInfo
ProxyFactoryKey string
@@ -49,127 +246,306 @@
methodsCompat []*config.MethodConfig
applicationCompat *config.ApplicationConfig
registriesCompat map[string]*config.RegistryConfig
- protocolCompat map[string]*config.ProtocolConfig
+ protocolsCompat map[string]*config.ProtocolConfig
}
-func defaultServerOptions() *ServerOptions {
- return &ServerOptions{
- Provider: global.DefaultProviderConfig(),
+func defaultServiceOptions() *ServiceOptions {
+ return &ServiceOptions{
+ Service: global.DefaultServiceConfig(),
+ Application: global.DefaultApplicationConfig(),
+ unexported: atomic.NewBool(false),
+ exported: atomic.NewBool(false),
+ needExport: true,
}
}
-func (srvOpts *ServerOptions) init(opts ...ServerOption) error {
+func (svcOpts *ServiceOptions) init(opts ...ServiceOption) error {
for _, opt := range opts {
- opt(srvOpts)
+ opt(svcOpts)
}
- return nil
+ if err := defaults.Set(svcOpts); err != nil {
+ return err
+ }
+
+ srv := svcOpts.Service
+
+ svcOpts.exported = atomic.NewBool(false)
+
+ application := svcOpts.Application
+ if application != nil {
+ svcOpts.applicationCompat = compatApplicationConfig(application)
+ if err := svcOpts.applicationCompat.Init(); err != nil {
+ return err
+ }
+ // todo(DMwangnima): make this clearer
+ // this statement is responsible for setting rootConfig.Application
+ // since many modules would retrieve this information directly.
+ config.GetRootConfig().Application = svcOpts.applicationCompat
+ svcOpts.metadataType = svcOpts.applicationCompat.MetadataType
+ if srv.Group == "" {
+ srv.Group = svcOpts.applicationCompat.Group
+ }
+ if srv.Version == "" {
+ srv.Version = svcOpts.applicationCompat.Version
+ }
+ }
+ svcOpts.unexported = atomic.NewBool(false)
+
+ // initialize Registries
+ if len(srv.RCRegistriesMap) == 0 {
+ srv.RCRegistriesMap = svcOpts.Registries
+ }
+ if len(srv.RCRegistriesMap) > 0 {
+ svcOpts.registriesCompat = make(map[string]*config.RegistryConfig)
+ for key, reg := range srv.RCRegistriesMap {
+ svcOpts.registriesCompat[key] = compatRegistryConfig(reg)
+ if err := svcOpts.registriesCompat[key].Init(); err != nil {
+ return err
+ }
+ }
+ }
+
+ // initialize Protocols
+ if len(srv.RCProtocolsMap) == 0 {
+ srv.RCProtocolsMap = svcOpts.Protocols
+ }
+ if len(srv.RCProtocolsMap) > 0 {
+ svcOpts.protocolsCompat = make(map[string]*config.ProtocolConfig)
+ for key, pro := range srv.RCProtocolsMap {
+ svcOpts.protocolsCompat[key] = compatProtocolConfig(pro)
+ if err := svcOpts.protocolsCompat[key].Init(); err != nil {
+ return err
+ }
+ }
+ }
+
+ srv.RegistryIDs = commonCfg.TranslateIds(srv.RegistryIDs)
+ if len(srv.RegistryIDs) <= 0 {
+ srv.RegistryIDs = svcOpts.Provider.RegistryIDs
+ }
+ if srv.RegistryIDs == nil || len(srv.RegistryIDs) <= 0 {
+ srv.NotRegister = true
+ }
+
+ srv.ProtocolIDs = commonCfg.TranslateIds(srv.ProtocolIDs)
+ if len(srv.ProtocolIDs) <= 0 {
+ srv.ProtocolIDs = svcOpts.Provider.ProtocolIDs
+ }
+ if len(srv.ProtocolIDs) <= 0 {
+ for name := range svcOpts.Protocols {
+ srv.ProtocolIDs = append(srv.ProtocolIDs, name)
+ }
+ }
+
+ if srv.TracingKey == "" {
+ srv.TracingKey = svcOpts.Provider.TracingKey
+ }
+
+ err := svcOpts.check()
+ if err != nil {
+ panic(err)
+ }
+ svcOpts.needExport = true
+ return commonCfg.Verify(svcOpts)
}
-type ServerOption func(*ServerOptions)
+type ServiceOption func(*ServiceOptions)
// ---------- For user ----------
-func WithRegistryIDs(registryIDs []string) ServerOption {
- return func(cfg *ServerOptions) {
+// todo(DMwangnima): think about a more ideal configuration style
+func WithRegistryIDs(registryIDs []string) ServiceOption {
+ return func(cfg *ServiceOptions) {
if len(registryIDs) <= 0 {
- cfg.Server.RegistryIDs = registryIDs
+ cfg.Service.RegistryIDs = registryIDs
}
}
}
-func WithFilter(filter string) ServerOption {
- return func(cfg *ServerOptions) {
- cfg.Server.Filter = filter
+// todo(DMwangnima): change Filter Option like Cluster and LoadBalance
+func WithFilter(filter string) ServiceOption {
+ return func(cfg *ServiceOptions) {
+ cfg.Service.Filter = filter
}
}
-func WithProtocolIDs(protocolIDs []string) ServerOption {
- return func(cfg *ServerOptions) {
+// todo(DMwangnima): think about a more ideal configuration style
+func WithProtocolIDs(protocolIDs []string) ServiceOption {
+ return func(cfg *ServiceOptions) {
if len(protocolIDs) <= 0 {
- cfg.Server.ProtocolIDs = protocolIDs
+ cfg.Service.ProtocolIDs = protocolIDs
}
}
}
-func WithTracingKey(tracingKey string) ServerOption {
- return func(cfg *ServerOptions) {
- cfg.Server.TracingKey = tracingKey
+func WithTracingKey(tracingKey string) ServiceOption {
+ return func(cfg *ServiceOptions) {
+ cfg.Service.TracingKey = tracingKey
}
}
-func WithLoadBalance(loadBalance string) ServerOption {
- return func(cfg *ServerOptions) {
- cfg.Server.Loadbalance = loadBalance
+// ========== LoadBalance Strategy ==========
+
+func WithLoadBalanceConsistentHashing() ServiceOption {
+ return func(opts *ServiceOptions) {
+ opts.Service.Loadbalance = constant.LoadBalanceKeyConsistentHashing
}
}
-func WithWarmUp(warmUp string) ServerOption {
- return func(cfg *ServerOptions) {
- cfg.Server.Warmup = warmUp
+func WithLoadBalanceLeastActive() ServiceOption {
+ return func(opts *ServiceOptions) {
+ opts.Service.Loadbalance = constant.LoadBalanceKeyLeastActive
}
}
-func WithCluster(cluster string) ServerOption {
- return func(cfg *ServerOptions) {
- cfg.Server.Cluster = cluster
+func WithLoadBalanceRandom() ServiceOption {
+ return func(opts *ServiceOptions) {
+ opts.Service.Loadbalance = constant.LoadBalanceKeyRandom
}
}
-func WithGroup(group string) ServerOption {
- return func(cfg *ServerOptions) {
- cfg.Server.Group = group
+func WithLoadBalanceRoundRobin() ServiceOption {
+ return func(opts *ServiceOptions) {
+ opts.Service.Loadbalance = constant.LoadBalanceKeyRoundRobin
}
}
-func WithVersion(version string) ServerOption {
- return func(cfg *ServerOptions) {
- cfg.Server.Version = version
+func WithLoadBalanceP2C() ServiceOption {
+ return func(opts *ServiceOptions) {
+ opts.Service.Loadbalance = constant.LoadBalanceKeyP2C
}
}
-func WithSerialization(serialization string) ServerOption {
- return func(cfg *ServerOptions) {
- cfg.Server.Serialization = serialization
+func WithLoadBalanceXDSRingHash() ServiceOption {
+ return func(opts *ServiceOptions) {
+ opts.Service.Loadbalance = constant.LoadBalanceKeyLeastActive
}
}
-func WithNotRegister(notRegister bool) ServerOption {
- return func(cfg *ServerOptions) {
- cfg.Server.NotRegister = notRegister
+// warmUp is in seconds
+func WithWarmUp(warmUp time.Duration) ServiceOption {
+ return func(opts *ServiceOptions) {
+ warmUpSec := int(warmUp / time.Second)
+ opts.Service.Warmup = strconv.Itoa(warmUpSec)
}
}
-// ----------From framework----------
+// ========== Cluster Strategy ==========
-func WithApplicationConfig(opts ...global.ApplicationOption) ServerOption {
- appCfg := new(global.ApplicationConfig)
- for _, opt := range opts {
- opt(appCfg)
- }
-
- return func(opts *ServerOptions) {
- opts.Application = appCfg
+func WithClusterAvailable() ServiceOption {
+ return func(opts *ServiceOptions) {
+ opts.Service.Cluster = constant.ClusterKeyAvailable
}
}
-func WithProviderConfig(opts ...global.ProviderOption) ServerOption {
- providerCfg := new(global.ProviderConfig)
- for _, opt := range opts {
- opt(providerCfg)
- }
-
- return func(opts *ServerOptions) {
- opts.Provider = providerCfg
+func WithClusterBroadcast() ServiceOption {
+ return func(opts *ServiceOptions) {
+ opts.Service.Cluster = constant.ClusterKeyBroadcast
}
}
-func WithServiceConfig(opts ...global.ServiceOption) ServerOption {
- serviceCfg := new(global.ServiceConfig)
- for _, opt := range opts {
- opt(serviceCfg)
+func WithClusterFailBack() ServiceOption {
+ return func(opts *ServiceOptions) {
+ opts.Service.Cluster = constant.ClusterKeyFailback
}
+}
- return func(opts *ServerOptions) {
- opts.Server = serviceCfg
+func WithClusterFailFast() ServiceOption {
+ return func(opts *ServiceOptions) {
+ opts.Service.Cluster = constant.ClusterKeyFailfast
+ }
+}
+
+func WithClusterFailOver() ServiceOption {
+ return func(opts *ServiceOptions) {
+ opts.Service.Cluster = constant.ClusterKeyFailover
+ }
+}
+
+func WithClusterFailSafe() ServiceOption {
+ return func(opts *ServiceOptions) {
+ opts.Service.Cluster = constant.ClusterKeyFailsafe
+ }
+}
+
+func WithClusterForking() ServiceOption {
+ return func(opts *ServiceOptions) {
+ opts.Service.Cluster = constant.ClusterKeyForking
+ }
+}
+
+func WithClusterZoneAware() ServiceOption {
+ return func(opts *ServiceOptions) {
+ opts.Service.Cluster = constant.ClusterKeyZoneAware
+ }
+}
+
+func WithClusterAdaptiveService() ServiceOption {
+ return func(opts *ServiceOptions) {
+ opts.Service.Cluster = constant.ClusterKeyAdaptiveService
+ }
+}
+
+func WithGroup(group string) ServiceOption {
+ return func(cfg *ServiceOptions) {
+ cfg.Service.Group = group
+ }
+}
+
+func WithVersion(version string) ServiceOption {
+ return func(cfg *ServiceOptions) {
+ cfg.Service.Version = version
+ }
+}
+
+func WithJSON() ServiceOption {
+ return func(opts *ServiceOptions) {
+ opts.Service.Serialization = constant.JSONSerialization
+ }
+}
+
+// WithToken should be used with WithFilter("token")
+func WithToken(token string) ServiceOption {
+ return func(opts *ServiceOptions) {
+ opts.Service.Token = token
+ }
+}
+
+func WithNotRegister() ServiceOption {
+ return func(cfg *ServiceOptions) {
+ cfg.Service.NotRegister = true
+ }
+}
+
+// ----------For framework----------
+// These functions should not be invoked by users
+
+func SetApplication(application *global.ApplicationConfig) ServiceOption {
+ return func(opts *ServiceOptions) {
+ opts.Application = application
+ }
+}
+
+func SetProvider(provider *global.ProviderConfig) ServiceOption {
+ return func(opts *ServiceOptions) {
+ opts.Provider = provider
+ }
+}
+
+func SetService(service *global.ServiceConfig) ServiceOption {
+ return func(opts *ServiceOptions) {
+ opts.Service = service
+ }
+}
+
+func SetRegistries(regs map[string]*global.RegistryConfig) ServiceOption {
+ return func(opts *ServiceOptions) {
+ opts.Registries = regs
+ }
+}
+
+func SetProtocols(pros map[string]*global.ProtocolConfig) ServiceOption {
+ return func(opts *ServiceOptions) {
+ opts.Protocols = pros
}
}
diff --git a/server/server.go b/server/server.go
index 763af38..3cbcd84 100644
--- a/server/server.go
+++ b/server/server.go
@@ -19,7 +19,18 @@
import (
"context"
+ "fmt"
+)
+
+import (
+ "github.com/pkg/errors"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common"
+ "dubbo.apache.org/dubbo-go/v3/metadata"
"dubbo.apache.org/dubbo-go/v3/protocol"
+ registry_exposed "dubbo.apache.org/dubbo-go/v3/registry/exposed_tmp"
)
type Server struct {
@@ -37,14 +48,114 @@
Meta map[string]interface{}
}
+type infoInvoker struct {
+ url *common.URL
+ base *protocol.BaseInvoker
+ info *ServiceInfo
+ svc common.RPCService
+ methodMap map[string]*MethodInfo
+}
+
+func (ii *infoInvoker) init() {
+ url := ii.base.GetURL()
+ if url.SubURL != nil {
+ url = url.SubURL
+ }
+ ii.url = url
+ methodMap := make(map[string]*MethodInfo)
+ for i := range ii.info.Methods {
+ methodMap[ii.info.Methods[i].Name] = &ii.info.Methods[i]
+ }
+ ii.methodMap = methodMap
+}
+
+func (ii *infoInvoker) GetURL() *common.URL {
+ return ii.base.GetURL()
+}
+
+func (ii *infoInvoker) IsAvailable() bool {
+ return ii.base.IsAvailable()
+}
+
+func (ii *infoInvoker) Destroy() {
+ ii.base.Destroy()
+}
+
+func (ii *infoInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
+ name := invocation.MethodName()
+ args := invocation.Arguments()
+ result := new(protocol.RPCResult)
+ if method, ok := ii.methodMap[name]; ok {
+ res, err := method.MethodFunc(ctx, args, ii.svc)
+ result.SetResult(res)
+ result.SetError(err)
+ return result
+ }
+ result.SetError(fmt.Errorf("no match method for %s", name))
+
+ return result
+}
+
+func newInfoInvoker(url *common.URL, info *ServiceInfo, svc common.RPCService) protocol.Invoker {
+ invoker := &infoInvoker{
+ base: protocol.NewBaseInvoker(url),
+ info: info,
+ svc: svc,
+ }
+ invoker.init()
+ return invoker
+}
+
// Register assemble invoker chains like ProviderConfig.Load, init a service per call
-func (s *Server) Register(handler interface{}, info *ServiceInfo, opts ...ServerOption) error {
- for _, opt := range opts {
- opt(s.cfg)
+func (s *Server) Register(handler interface{}, info *ServiceInfo, opts ...ServiceOption) error {
+ if s.cfg == nil {
+ return errors.New("Server has not been initialized, please use NewServer() to create Server")
+ }
+ var svcOpts []ServiceOption
+ appCfg := s.cfg.Application
+ proCfg := s.cfg.Provider
+ prosCfg := s.cfg.Protocols
+ regsCfg := s.cfg.Registries
+ // todo(DMwangnima): record the registered service
+ newSvcOpts := defaultServiceOptions()
+ if appCfg != nil {
+ svcOpts = append(svcOpts,
+ SetApplication(s.cfg.Application),
+ )
+ }
+ if proCfg != nil {
+ svcOpts = append(svcOpts,
+ SetProvider(proCfg),
+ )
+ }
+ if prosCfg != nil {
+ svcOpts = append(svcOpts,
+ SetProtocols(prosCfg),
+ )
+ }
+ if regsCfg != nil {
+ svcOpts = append(svcOpts,
+ SetRegistries(regsCfg),
+ )
}
- // ProviderConfig.Load
- // url
+ // options passed by users have higher priority
+ svcOpts = append(svcOpts, opts...)
+ if err := newSvcOpts.init(svcOpts...); err != nil {
+ return err
+ }
+ newSvcOpts.Implement(handler)
+ if err := newSvcOpts.ExportWithInfo(info); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (s *Server) Serve() error {
+ metadata.ExportMetadataService()
+ registry_exposed.RegisterServiceInstance(s.cfg.Application.Name, s.cfg.Application.Tag, s.cfg.Application.MetadataType)
+ select {}
return nil
}