Feature triple api tuning (#2525)
* feat: introduce connection concept
* fix CI
* fix lint
diff --git a/client/action.go b/client/action.go
index 7cc67a0..b5a0faf 100644
--- a/client/action.go
+++ b/client/action.go
@@ -39,6 +39,7 @@
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/config"
+ "dubbo.apache.org/dubbo-go/v3/global"
"dubbo.apache.org/dubbo-go/v3/graceful_shutdown"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/protocolwrapper"
@@ -54,7 +55,7 @@
func updateOrCreateMeshURL(opts *ReferenceOptions) {
ref := opts.Reference
- con := opts.cliOpts.Consumer
+ con := opts.Consumer
if ref.URL != "" {
logger.Infof("URL specified explicitly %v", ref.URL)
@@ -98,8 +99,7 @@
func (refOpts *ReferenceOptions) refer(srv common.RPCService, info *ClientInfo) {
ref := refOpts.Reference
- clientOpts := refOpts.cliOpts
- con := clientOpts.Consumer
+ con := refOpts.Consumer
var methods []string
if info != nil {
@@ -107,8 +107,10 @@
methods = info.MethodNames
refOpts.id = info.InterfaceName
refOpts.info = info
- } else {
+ } else if srv != nil {
refOpts.id = common.GetReference(srv)
+ } else {
+ refOpts.id = ref.InterfaceName
}
// If adaptive service is enabled,
// the cluster and load balance should be overridden to "adaptivesvc" and "p2c" respectively.
@@ -139,77 +141,23 @@
updateOrCreateMeshURL(refOpts)
// retrieving urls from config, and appending the urls to refOpts.urls
- if err := refOpts.processURL(cfgURL); err != nil {
+ urls, err := processURL(ref, refOpts.registriesCompat, cfgURL)
+ if err != nil {
panic(err)
}
- // Get invokers according to refOpts.urls
- var (
- invoker protocol.Invoker
- regURL *common.URL
- )
- invokers := make([]protocol.Invoker, len(refOpts.urls))
- for i, u := range refOpts.urls {
- if u.Protocol == constant.ServiceRegistryProtocol {
- invoker = extension.GetProtocol(constant.RegistryProtocol).Refer(u)
- } else {
- invoker = extension.GetProtocol(u.Protocol).Refer(u)
- }
-
- if ref.URL != "" {
- invoker = protocolwrapper.BuildInvokerChain(invoker, constant.ReferenceFilterKey)
- }
-
- invokers[i] = invoker
- if u.Protocol == constant.RegistryProtocol {
- regURL = u
- }
+ // build invoker according to urls
+ invoker, err := buildInvoker(urls, ref)
+ if err != nil {
+ panic(err)
}
-
- // TODO(hxmhlt): decouple from directory, config should not depend on directory module
- if len(invokers) == 1 {
- refOpts.invoker = invokers[0]
- if ref.URL != "" {
- hitClu := constant.ClusterKeyFailover
- if u := refOpts.invoker.GetURL(); u != nil {
- hitClu = u.GetParam(constant.ClusterKey, constant.ClusterKeyZoneAware)
- }
- cluster, err := extension.GetCluster(hitClu)
- if err != nil {
- panic(err)
- } else {
- refOpts.invoker = cluster.Join(static.NewDirectory(invokers))
- }
- }
- } else {
- var hitClu string
- if regURL != nil {
- // for multi-subscription scenario, use 'zone-aware' policy by default
- hitClu = constant.ClusterKeyZoneAware
- } else {
- // not a registry url, must be direct invoke.
- hitClu = constant.ClusterKeyFailover
- if u := invokers[0].GetURL(); u != nil {
- hitClu = u.GetParam(constant.ClusterKey, constant.ClusterKeyZoneAware)
- }
- }
- cluster, err := extension.GetCluster(hitClu)
- if err != nil {
- panic(err)
- } else {
- refOpts.invoker = cluster.Join(static.NewDirectory(invokers))
- }
- }
+ refOpts.urls = urls
+ refOpts.invoker = invoker
// publish consumer's metadata
publishServiceDefinition(cfgURL)
// create proxy
- if info == nil {
- // todo(DMwangnima): think about a more ideal way
- if con == nil {
- panic("client must be configured with ConsumerConfig when using config.Load")
- }
-
+ if info == nil && srv != nil {
if ref.Async {
var callback common.CallbackResponse
if asyncSrv, ok := srv.(common.AsyncCallbackService); ok {
@@ -226,8 +174,8 @@
graceful_shutdown.RegisterProtocol(ref.Protocol)
}
-func (refOpts *ReferenceOptions) processURL(cfgURL *common.URL) error {
- ref := refOpts.Reference
+func processURL(ref *global.ReferenceConfig, regsCompat map[string]*config.RegistryConfig, cfgURL *common.URL) ([]*common.URL, error) {
+ var urls []*common.URL
if ref.URL != "" { // use user-specific urls
/*
Two types of URL are allowed for refOpts.URL:
@@ -244,11 +192,11 @@
for _, urlStr := range urlStrings {
serviceURL, err := common.NewURL(urlStr, common.WithProtocol(ref.Protocol))
if err != nil {
- return fmt.Errorf(fmt.Sprintf("url configuration error, please check your configuration, user specified URL %v refer error, error message is %v ", urlStr, err.Error()))
+ return nil, fmt.Errorf(fmt.Sprintf("url configuration error, please check your configuration, user specified URL %v refer error, error message is %v ", urlStr, err.Error()))
}
if serviceURL.Protocol == constant.RegistryProtocol { // serviceURL in this branch is a registry protocol
serviceURL.SubURL = cfgURL
- refOpts.urls = append(refOpts.urls, serviceURL)
+ urls = append(urls, serviceURL)
} else { // serviceURL in this branch is the target endpoint IP address
if serviceURL.Path == "" {
serviceURL.Path = "/" + ref.InterfaceName
@@ -257,17 +205,93 @@
// other stuff, e.g. IP, port, etc., are same as serviceURL
newURL := serviceURL.MergeURL(cfgURL)
newURL.AddParam("peer", "true")
- refOpts.urls = append(refOpts.urls, newURL)
+ urls = append(urls, newURL)
}
}
} else { // use registry configs
- refOpts.urls = config.LoadRegistries(ref.RegistryIDs, refOpts.registriesCompat, common.CONSUMER)
+ urls = config.LoadRegistries(ref.RegistryIDs, regsCompat, common.CONSUMER)
// set url to regURLs
- for _, regURL := range refOpts.urls {
+ for _, regURL := range urls {
regURL.SubURL = cfgURL
}
}
- return nil
+ return urls, nil
+}
+
+func buildInvoker(urls []*common.URL, ref *global.ReferenceConfig) (protocol.Invoker, error) {
+ var (
+ invoker protocol.Invoker
+ regURL *common.URL
+ )
+ invokers := make([]protocol.Invoker, len(urls))
+ for i, u := range urls {
+ if u.Protocol == constant.ServiceRegistryProtocol {
+ invoker = extension.GetProtocol(constant.RegistryProtocol).Refer(u)
+ } else {
+ invoker = extension.GetProtocol(u.Protocol).Refer(u)
+ }
+
+ if ref.URL != "" {
+ invoker = protocolwrapper.BuildInvokerChain(invoker, constant.ReferenceFilterKey)
+ }
+
+ if u.Protocol == constant.RegistryProtocol {
+ regURL = u
+ }
+ invokers[i] = invoker
+ }
+
+ var resInvoker protocol.Invoker
+ // TODO(hxmhlt): decouple from directory, config should not depend on directory module
+ if len(invokers) == 1 {
+ resInvoker = invokers[0]
+ if ref.URL != "" {
+ hitClu := constant.ClusterKeyFailover
+ if u := resInvoker.GetURL(); u != nil {
+ hitClu = u.GetParam(constant.ClusterKey, constant.ClusterKeyZoneAware)
+ }
+ cluster, err := extension.GetCluster(hitClu)
+ if err != nil {
+ return nil, err
+ }
+ resInvoker = cluster.Join(static.NewDirectory(invokers))
+ }
+ return resInvoker, nil
+ }
+
+ var hitClu string
+ if regURL != nil {
+ // for multi-subscription scenario, use 'zone-aware' policy by default
+ hitClu = constant.ClusterKeyZoneAware
+ } else {
+ // not a registry url, must be direct invoke.
+ hitClu = constant.ClusterKeyFailover
+ if u := invokers[0].GetURL(); u != nil {
+ hitClu = u.GetParam(constant.ClusterKey, constant.ClusterKeyZoneAware)
+ }
+ }
+ cluster, err := extension.GetCluster(hitClu)
+ if err != nil {
+ return nil, err
+ }
+ resInvoker = cluster.Join(static.NewDirectory(invokers))
+
+ return resInvoker, nil
+}
+
+func publishServiceDefinition(url *common.URL) {
+ localService, err := extension.GetLocalMetadataService(constant.DefaultKey)
+ if err != nil {
+ logger.Warnf("get local metadata service failed, please check if you have imported _ \"dubbo.apache.org/dubbo-go/v3/metadata/service/local\"")
+ return
+ }
+ localService.PublishServiceDefinition(url)
+ if url.GetParam(constant.MetadataTypeKey, "") != constant.RemoteMetadataStorageType {
+ return
+ }
+ if remoteMetadataService, err := extension.GetRemoteMetadataService(); err == nil && remoteMetadataService != nil {
+ remoteMetadataService.PublishServiceDefinition(url)
+ }
}
func (refOpts *ReferenceOptions) CheckAvailable() bool {
@@ -288,10 +312,8 @@
if refOpts.pxy != nil {
refOpts.pxy.Implement(v)
} else if refOpts.info != nil {
- refOpts.info.ClientInjectFunc(v, &Client{
- cliOpts: refOpts.cliOpts,
- info: refOpts.info,
- refOpts: map[string]*ReferenceOptions{},
+ refOpts.info.ConnectionInjectFunc(v, &Connection{
+ refOpts: refOpts,
})
}
}
@@ -309,8 +331,8 @@
func (refOpts *ReferenceOptions) getURLMap() url.Values {
ref := refOpts.Reference
app := refOpts.applicationCompat
- metrics := refOpts.cliOpts.Metrics
- tracing := refOpts.cliOpts.Otel.TracingConfig
+ metrics := refOpts.Metrics
+ tracing := refOpts.Otel.TracingConfig
urlMap := url.Values{}
// first set user params
@@ -397,18 +419,3 @@
p.PostProcessReferenceConfig(url)
}
}
-
-func publishServiceDefinition(url *common.URL) {
- localService, err := extension.GetLocalMetadataService(constant.DefaultKey)
- if err != nil {
- logger.Warnf("get local metadata service failed, please check if you have imported _ \"dubbo.apache.org/dubbo-go/v3/metadata/service/local\"")
- return
- }
- localService.PublishServiceDefinition(url)
- if url.GetParam(constant.MetadataTypeKey, "") != constant.RemoteMetadataStorageType {
- return
- }
- if remoteMetadataService, err := extension.GetRemoteMetadataService(); err == nil && remoteMetadataService != nil {
- remoteMetadataService.PublishServiceDefinition(url)
- }
-}
diff --git a/client/client.go b/client/client.go
index b905c64..4a9a722 100644
--- a/client/client.go
+++ b/client/client.go
@@ -20,32 +20,24 @@
import (
"context"
- "fmt"
)
import (
- "github.com/pkg/errors"
-)
-
-import (
- "dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/protocol"
invocation_impl "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
)
+// ConsumerConfig
type Client struct {
- info *ClientInfo
-
cliOpts *ClientOptions
- refOpts map[string]*ReferenceOptions
}
type ClientInfo struct {
- InterfaceName string
- MethodNames []string
- ClientInjectFunc func(dubboCliRaw interface{}, cli *Client)
- Meta map[string]interface{}
+ InterfaceName string
+ MethodNames []string
+ ConnectionInjectFunc func(dubboCliRaw interface{}, conn *Connection)
+ Meta map[string]interface{}
}
type ClientDefinition struct {
@@ -53,84 +45,95 @@
Info *ClientInfo
}
-func (cli *Client) call(ctx context.Context, paramsRawVals []interface{}, interfaceName, methodName, callType string, opts ...CallOption) (protocol.Result, error) {
- // get a default CallOptions
- // apply CallOption
+// InterfaceName/group/version /ReferenceConfig
+type Connection struct {
+ refOpts *ReferenceOptions
+}
+
+func (conn *Connection) call(ctx context.Context, reqs []interface{}, resp interface{}, methodName, callType string, opts ...CallOption) (protocol.Result, error) {
options := newDefaultCallOptions()
for _, opt := range opts {
opt(options)
}
-
- inv, err := generateInvocation(methodName, paramsRawVals, callType, options)
+ inv, err := generateInvocation(methodName, reqs, resp, callType, options)
if err != nil {
return nil, err
}
-
- refOption := cli.refOpts[common.ServiceKey(interfaceName, options.Group, options.Version)]
- if refOption == nil {
- return nil, fmt.Errorf("no service found for %s/%s:%s, please check if the service has been registered", options.Group, interfaceName, options.Version)
- }
-
- return refOption.invoker.Invoke(ctx, inv), nil
-
+ return conn.refOpts.invoker.Invoke(ctx, inv), nil
}
-func (cli *Client) CallUnary(ctx context.Context, req, resp interface{}, interfaceName, methodName string, opts ...CallOption) error {
- res, err := cli.call(ctx, []interface{}{req, resp}, interfaceName, methodName, constant.CallUnary, opts...)
+func (conn *Connection) CallUnary(ctx context.Context, reqs []interface{}, resp interface{}, methodName string, opts ...CallOption) error {
+ res, err := conn.call(ctx, reqs, resp, methodName, constant.CallUnary, opts...)
if err != nil {
return err
}
return res.Error()
}
-func (cli *Client) CallClientStream(ctx context.Context, interfaceName, methodName string, opts ...CallOption) (interface{}, error) {
- res, err := cli.call(ctx, nil, interfaceName, methodName, constant.CallClientStream, opts...)
+func (conn *Connection) CallClientStream(ctx context.Context, methodName string, opts ...CallOption) (interface{}, error) {
+ res, err := conn.call(ctx, nil, nil, methodName, constant.CallClientStream, opts...)
if err != nil {
return nil, err
}
return res.Result(), res.Error()
}
-func (cli *Client) CallServerStream(ctx context.Context, req interface{}, interfaceName, methodName string, opts ...CallOption) (interface{}, error) {
- res, err := cli.call(ctx, []interface{}{req}, interfaceName, methodName, constant.CallServerStream, opts...)
+func (conn *Connection) CallServerStream(ctx context.Context, req interface{}, methodName string, opts ...CallOption) (interface{}, error) {
+ res, err := conn.call(ctx, []interface{}{req}, nil, methodName, constant.CallServerStream, opts...)
if err != nil {
return nil, err
}
return res.Result(), res.Error()
}
-func (cli *Client) CallBidiStream(ctx context.Context, interfaceName, methodName string, opts ...CallOption) (interface{}, error) {
- res, err := cli.call(ctx, nil, interfaceName, methodName, constant.CallBidiStream, opts...)
+func (conn *Connection) CallBidiStream(ctx context.Context, methodName string, opts ...CallOption) (interface{}, error) {
+ res, err := conn.call(ctx, nil, nil, methodName, constant.CallBidiStream, opts...)
if err != nil {
return nil, err
}
return res.Result(), res.Error()
}
-func (cli *Client) Init(info *ClientInfo, opts ...ReferenceOption) (string, string, error) {
- if info == nil {
- return "", "", errors.New("ClientInfo is nil")
- }
-
- newRefOptions := defaultReferenceOptions()
- err := newRefOptions.init(cli, opts...)
- if err != nil {
- return "", "", err
- }
-
- ref := newRefOptions.Reference
- cli.refOpts[common.ServiceKey(info.InterfaceName, ref.Group, ref.Version)] = newRefOptions
-
- newRefOptions.ReferWithInfo(info)
-
- return ref.Group, ref.Version, nil
+func (cli *Client) Dial(interfaceName string, opts ...ReferenceOption) (*Connection, error) {
+ return cli.dial(interfaceName, nil, opts...)
}
-func generateInvocation(methodName string, paramsRawVals []interface{}, callType string, opts *CallOptions) (protocol.Invocation, error) {
+func (cli *Client) DialWithInfo(interfaceName string, info *ClientInfo, opts ...ReferenceOption) (*Connection, error) {
+ return cli.dial(interfaceName, info, opts...)
+}
+
+func (cli *Client) dial(interfaceName string, info *ClientInfo, opts ...ReferenceOption) (*Connection, error) {
+ newRefOpts := defaultReferenceOptions()
+ finalOpts := []ReferenceOption{
+ setReference(cli.cliOpts.overallReference),
+ setApplicationCompat(cli.cliOpts.applicationCompat),
+ setRegistriesCompat(cli.cliOpts.registriesCompat),
+ setConsumer(cli.cliOpts.Consumer),
+ // this config must be set after Reference initialized
+ setInterfaceName(interfaceName),
+ }
+ finalOpts = append(finalOpts, opts...)
+ if err := newRefOpts.init(finalOpts...); err != nil {
+ return nil, err
+ }
+ newRefOpts.ReferWithInfo(info)
+
+ return &Connection{refOpts: newRefOpts}, nil
+}
+func generateInvocation(methodName string, reqs []interface{}, resp interface{}, callType string, opts *CallOptions) (protocol.Invocation, error) {
+ var paramsRawVals []interface{}
+ for _, req := range reqs {
+ paramsRawVals = append(paramsRawVals, req)
+ }
+ if resp != nil {
+ paramsRawVals = append(paramsRawVals, resp)
+ }
inv := invocation_impl.NewRPCInvocationWithOptions(
invocation_impl.WithMethodName(methodName),
invocation_impl.WithAttachment(constant.TimeoutKey, opts.RequestTimeout),
invocation_impl.WithAttachment(constant.RetriesKey, opts.Retries),
+ invocation_impl.WithArguments(reqs),
+ invocation_impl.WithReply(resp),
invocation_impl.WithParameterRawValues(paramsRawVals),
)
inv.SetAttribute(constant.CallTypeKey, callType)
@@ -145,6 +148,5 @@
}
return &Client{
cliOpts: newCliOpts,
- refOpts: make(map[string]*ReferenceOptions),
}, nil
}
diff --git a/client/options.go b/client/options.go
index 686075e..c301eeb 100644
--- a/client/options.go
+++ b/client/options.go
@@ -18,7 +18,6 @@
package client
import (
- "reflect"
"strconv"
"time"
)
@@ -31,7 +30,6 @@
"dubbo.apache.org/dubbo-go/v3/common"
commonCfg "dubbo.apache.org/dubbo-go/v3/common/config"
"dubbo.apache.org/dubbo-go/v3/common/constant"
- "dubbo.apache.org/dubbo-go/v3/common/dubboutil"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/global"
"dubbo.apache.org/dubbo-go/v3/graceful_shutdown"
@@ -41,9 +39,10 @@
)
type ReferenceOptions struct {
- Reference *global.ReferenceConfig
- cliOpts *ClientOptions
- Registries map[string]*global.RegistryConfig
+ Reference *global.ReferenceConfig
+ Consumer *global.ConsumerConfig
+ Metrics *global.MetricsConfig
+ Otel *global.OtelConfig
pxy *proxy.Proxy
id string
@@ -60,10 +59,12 @@
func defaultReferenceOptions() *ReferenceOptions {
return &ReferenceOptions{
Reference: global.DefaultReferenceConfig(),
+ Metrics: global.DefaultMetricsConfig(),
+ Otel: global.DefaultOtelConfig(),
}
}
-func (refOpts *ReferenceOptions) init(cli *Client, opts ...ReferenceOption) error {
+func (refOpts *ReferenceOptions) init(opts ...ReferenceOption) error {
for _, opt := range opts {
opt(refOpts)
}
@@ -71,11 +72,19 @@
return err
}
- refOpts.cliOpts = cli.cliOpts
- dubboutil.CopyFields(reflect.ValueOf(refOpts.cliOpts.Consumer).Elem(), reflect.ValueOf(refOpts.Reference).Elem())
-
ref := refOpts.Reference
+ app := refOpts.applicationCompat
+ if app != nil {
+ refOpts.metaDataType = app.MetadataType
+ if ref.Group == "" {
+ ref.Group = app.Group
+ }
+ if ref.Version == "" {
+ ref.Version = app.Version
+ }
+ }
+
// init method
methods := ref.Methods
if length := len(methods); length > 0 {
@@ -88,55 +97,43 @@
}
}
- // init application
- application := refOpts.cliOpts.Application
- if application != nil {
- refOpts.applicationCompat = compatApplicationConfig(application)
- if err := refOpts.applicationCompat.Init(); err != nil {
- return err
- }
- refOpts.metaDataType = refOpts.applicationCompat.MetadataType
- if ref.Group == "" {
- ref.Group = refOpts.applicationCompat.Group
- }
- if ref.Version == "" {
- ref.Version = refOpts.applicationCompat.Version
- }
- }
// init cluster
if ref.Cluster == "" {
ref.Cluster = "failover"
}
- // todo(DMwangnima): move to registry package
// init registries
- var emptyRegIDsFlag bool
- if ref.RegistryIDs == nil || len(ref.RegistryIDs) <= 0 {
- emptyRegIDsFlag = true
- }
-
- // set client level as default registry
- regs := refOpts.Registries
- if regs == nil {
- regs = cli.cliOpts.Registries
- }
-
- if regs != nil {
- refOpts.registriesCompat = make(map[string]*config.RegistryConfig)
- for key, reg := range regs {
- refOpts.registriesCompat[key] = compatRegistryConfig(reg)
- if err := refOpts.registriesCompat[key].Init(); err != nil {
- return err
- }
- if emptyRegIDsFlag {
+ if len(refOpts.registriesCompat) > 0 {
+ regs := refOpts.registriesCompat
+ if len(ref.RegistryIDs) <= 0 {
+ ref.RegistryIDs = make([]string, len(regs))
+ for key := range regs {
ref.RegistryIDs = append(ref.RegistryIDs, key)
}
}
- }
- ref.RegistryIDs = commonCfg.TranslateIds(ref.RegistryIDs)
+ ref.RegistryIDs = commonCfg.TranslateIds(ref.RegistryIDs)
- // init graceful_shutdown
- graceful_shutdown.Init(graceful_shutdown.SetShutdown_Config(refOpts.cliOpts.Shutdown))
+ newRegs := make(map[string]*config.RegistryConfig)
+ for _, id := range ref.RegistryIDs {
+ if reg, ok := regs[id]; ok {
+ newRegs[id] = reg
+ }
+ }
+ refOpts.registriesCompat = newRegs
+ }
+
+ // init protocol
+ if ref.Protocol == "" {
+ ref.Protocol = "tri"
+ if refOpts.Consumer != nil && refOpts.Consumer.Protocol != "" {
+ ref.Protocol = refOpts.Consumer.Protocol
+ }
+ }
+
+ // init serialization
+ if ref.Serialization == "" {
+ ref.Serialization = constant.ProtobufSerialization
+ }
return commonCfg.Verify(refOpts)
}
@@ -158,15 +155,13 @@
}
}
-// todo(DMwangnima): change Filter Option like Cluster and LoadBalance
func WithFilter(filter string) ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.Filter = filter
}
}
-// todo(DMwangnima): think about a more ideal configuration style
-func WithRegistryIDs(registryIDs []string) ReferenceOption {
+func WithRegistryIDs(registryIDs ...string) ReferenceOption {
return func(opts *ReferenceOptions) {
if len(registryIDs) > 0 {
opts.Reference.RegistryIDs = registryIDs
@@ -174,17 +169,6 @@
}
}
-func WithRegistry(opts ...registry.Option) ReferenceOption {
- regOpts := registry.NewOptions(opts...)
-
- return func(refOpts *ReferenceOptions) {
- if refOpts.Registries == nil {
- refOpts.Registries = make(map[string]*global.RegistryConfig)
- }
- refOpts.Registries[regOpts.ID] = regOpts.Registry
- }
-}
-
// ========== Cluster Strategy ==========
func WithClusterAvailable() ReferenceOption {
@@ -303,12 +287,18 @@
}
}
-func WithJSON() ReferenceOption {
+func WithSerializationJSON() ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.Serialization = constant.JSONSerialization
}
}
+func WithSerialization(serialization string) ReferenceOption {
+ return func(opts *ReferenceOptions) {
+ opts.Reference.Serialization = serialization
+ }
+}
+
func WithProvidedBy(providedBy string) ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.ProvidedBy = providedBy
@@ -323,6 +313,9 @@
func WithParams(params map[string]string) ReferenceOption {
return func(opts *ReferenceOptions) {
+ if len(params) <= 0 {
+ return
+ }
opts.Reference.Params = params
}
}
@@ -333,9 +326,9 @@
}
}
-func WithSticky(sticky bool) ReferenceOption {
+func WithSticky() ReferenceOption {
return func(opts *ReferenceOptions) {
- opts.Reference.Sticky = sticky
+ opts.Reference.Sticky = true
}
}
@@ -406,15 +399,33 @@
// ---------- For framework ----------
// These functions should not be invoked by users
-func SetRegistries(regs map[string]*global.RegistryConfig) ReferenceOption {
+func setReference(reference *global.ReferenceConfig) ReferenceOption {
return func(opts *ReferenceOptions) {
- opts.Registries = regs
+ opts.Reference = reference
}
}
-func SetReference(reference *global.ReferenceConfig) ReferenceOption {
+func setInterfaceName(interfaceName string) ReferenceOption {
return func(opts *ReferenceOptions) {
- opts.Reference = reference
+ opts.Reference.InterfaceName = interfaceName
+ }
+}
+
+func setApplicationCompat(app *config.ApplicationConfig) ReferenceOption {
+ return func(opts *ReferenceOptions) {
+ opts.applicationCompat = app
+ }
+}
+
+func setRegistriesCompat(regs map[string]*config.RegistryConfig) ReferenceOption {
+ return func(opts *ReferenceOptions) {
+ opts.registriesCompat = regs
+ }
+}
+
+func setConsumer(consumer *global.ConsumerConfig) ReferenceOption {
+ return func(opts *ReferenceOptions) {
+ opts.Consumer = consumer
}
}
@@ -425,16 +436,21 @@
Shutdown *global.ShutdownConfig
Metrics *global.MetricsConfig
Otel *global.OtelConfig
+
+ overallReference *global.ReferenceConfig
+ applicationCompat *config.ApplicationConfig
+ registriesCompat map[string]*config.RegistryConfig
}
func defaultClientOptions() *ClientOptions {
return &ClientOptions{
- Consumer: global.DefaultConsumerConfig(),
- Registries: make(map[string]*global.RegistryConfig),
- Application: global.DefaultApplicationConfig(),
- Shutdown: global.DefaultShutdownConfig(),
- Metrics: global.DefaultMetricsConfig(),
- Otel: global.DefaultOtelConfig(),
+ Consumer: global.DefaultConsumerConfig(),
+ Registries: make(map[string]*global.RegistryConfig),
+ Application: global.DefaultApplicationConfig(),
+ Shutdown: global.DefaultShutdownConfig(),
+ Metrics: global.DefaultMetricsConfig(),
+ Otel: global.DefaultOtelConfig(),
+ overallReference: global.DefaultReferenceConfig(),
}
}
@@ -445,6 +461,59 @@
if err := defaults.Set(cliOpts); err != nil {
return err
}
+
+ con := cliOpts.Consumer
+
+ // init application
+ application := cliOpts.Application
+ if application != nil {
+ cliOpts.applicationCompat = compatApplicationConfig(application)
+ if err := cliOpts.applicationCompat.Init(); err != nil {
+ return err
+ }
+ }
+
+ // init registries
+ regs := cliOpts.Registries
+ if regs != nil {
+ cliOpts.registriesCompat = make(map[string]*config.RegistryConfig)
+ if len(con.RegistryIDs) <= 0 {
+ con.RegistryIDs = make([]string, len(regs))
+ for key := range regs {
+ con.RegistryIDs = append(con.RegistryIDs, key)
+ }
+ }
+ con.RegistryIDs = commonCfg.TranslateIds(con.RegistryIDs)
+
+ for _, id := range con.RegistryIDs {
+ if reg, ok := regs[id]; ok {
+ cliOpts.registriesCompat[id] = compatRegistryConfig(reg)
+ if err := cliOpts.registriesCompat[id].Init(); err != nil {
+ return err
+ }
+ }
+ }
+ }
+
+ // init cluster
+ if cliOpts.overallReference.Cluster == "" {
+ cliOpts.overallReference.Cluster = constant.ClusterKeyFailover
+ }
+
+ // init protocol
+ if cliOpts.Consumer.Protocol == "" {
+ cliOpts.Consumer.Protocol = "tri"
+ }
+
+ // init serialization
+ if cliOpts.overallReference.Serialization == "" {
+ cliOpts.overallReference.Serialization = constant.ProtobufSerialization
+ }
+
+ // todo(DMwangnima): is there any part that we should do compatibility processing?
+
+ // init graceful_shutdown
+ graceful_shutdown.Init(graceful_shutdown.SetShutdown_Config(cliOpts.Shutdown))
return nil
}
@@ -458,19 +527,20 @@
func WithClientURL(url string) ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.URL = url
+ opts.overallReference.URL = url
}
}
// todo(DMwangnima): change Filter Option like Cluster and LoadBalance
func WithClientFilter(filter string) ClientOption {
return func(opts *ClientOptions) {
+ // todo: move this to overallReference
opts.Consumer.Filter = filter
}
}
// todo(DMwangnima): think about a more ideal configuration style
-func WithClientRegistryIDs(registryIDs []string) ClientOption {
+func WithClientRegistryIDs(registryIDs ...string) ClientOption {
return func(opts *ClientOptions) {
if len(registryIDs) > 0 {
opts.Consumer.RegistryIDs = registryIDs
@@ -482,9 +552,6 @@
regOpts := registry.NewOptions(opts...)
return func(cliOpts *ClientOptions) {
- if cliOpts.Registries == nil {
- cliOpts.Registries = make(map[string]*global.RegistryConfig)
- }
cliOpts.Registries[regOpts.ID] = regOpts.Registry
}
}
@@ -501,55 +568,61 @@
func WithClientClusterAvailable() ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Cluster = constant.ClusterKeyAvailable
+ opts.overallReference.Cluster = constant.ClusterKeyAvailable
}
}
func WithClientClusterBroadcast() ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Cluster = constant.ClusterKeyBroadcast
+ opts.overallReference.Cluster = constant.ClusterKeyBroadcast
}
}
func WithClientClusterFailBack() ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Cluster = constant.ClusterKeyFailback
+ opts.overallReference.Cluster = constant.ClusterKeyFailback
}
}
func WithClientClusterFailFast() ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Cluster = constant.ClusterKeyFailfast
+ opts.overallReference.Cluster = constant.ClusterKeyFailfast
}
}
func WithClientClusterFailOver() ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Cluster = constant.ClusterKeyFailover
+ opts.overallReference.Cluster = constant.ClusterKeyFailover
}
}
func WithClientClusterFailSafe() ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Cluster = constant.ClusterKeyFailsafe
+ opts.overallReference.Cluster = constant.ClusterKeyFailsafe
}
}
func WithClientClusterForking() ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Cluster = constant.ClusterKeyForking
+ opts.overallReference.Cluster = constant.ClusterKeyForking
}
}
func WithClientClusterZoneAware() ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Cluster = constant.ClusterKeyZoneAware
+ opts.overallReference.Cluster = constant.ClusterKeyZoneAware
}
}
func WithClientClusterAdaptiveService() ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Cluster = constant.ClusterKeyAdaptiveService
+ opts.overallReference.Cluster = constant.ClusterKeyAdaptiveService
+ }
+}
+
+func WithClientClusterStrategy(strategy string) ClientOption {
+ return func(opts *ClientOptions) {
+ opts.overallReference.Cluster = strategy
}
}
@@ -557,73 +630,75 @@
func WithClientLoadBalanceConsistentHashing() ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Loadbalance = constant.LoadBalanceKeyConsistentHashing
+ opts.overallReference.Loadbalance = constant.LoadBalanceKeyConsistentHashing
}
}
func WithClientLoadBalanceLeastActive() ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Loadbalance = constant.LoadBalanceKeyLeastActive
+ opts.overallReference.Loadbalance = constant.LoadBalanceKeyLeastActive
}
}
func WithClientLoadBalanceRandom() ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Loadbalance = constant.LoadBalanceKeyRandom
+ opts.overallReference.Loadbalance = constant.LoadBalanceKeyRandom
}
}
func WithClientLoadBalanceRoundRobin() ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Loadbalance = constant.LoadBalanceKeyRoundRobin
+ opts.overallReference.Loadbalance = constant.LoadBalanceKeyRoundRobin
}
}
func WithClientLoadBalanceP2C() ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Loadbalance = constant.LoadBalanceKeyP2C
+ opts.overallReference.Loadbalance = constant.LoadBalanceKeyP2C
}
}
func WithClientLoadBalance(lb string) ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Loadbalance = lb
+ opts.overallReference.Loadbalance = lb
}
}
func WithClientRetries(retries int) ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Retries = strconv.Itoa(retries)
+ opts.overallReference.Retries = strconv.Itoa(retries)
}
}
+// is this needed?
func WithClientGroup(group string) ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Group = group
+ opts.overallReference.Group = group
}
}
+// is this needed?
func WithClientVersion(version string) ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Version = version
+ opts.overallReference.Version = version
}
}
func WithClientSerializationJSON() ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Serialization = constant.JSONSerialization
+ opts.overallReference.Serialization = constant.JSONSerialization
}
}
func WithClientSerialization(ser string) ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Serialization = ser
+ opts.overallReference.Serialization = ser
}
}
func WithClientProvidedBy(providedBy string) ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.ProvidedBy = providedBy
+ opts.overallReference.ProvidedBy = providedBy
}
}
@@ -636,16 +711,19 @@
func WithClientParams(params map[string]string) ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Params = params
+ if len(params) <= 0 {
+ return
+ }
+ opts.overallReference.Params = params
}
}
func WithClientParam(k, v string) ClientOption {
return func(opts *ClientOptions) {
- if opts.Consumer.Params == nil {
- opts.Consumer.Params = make(map[string]string)
+ if opts.overallReference.Params == nil {
+ opts.overallReference.Params = make(map[string]string, 8)
}
- opts.Consumer.Params[k] = v
+ opts.overallReference.Params[k] = v
}
}
@@ -660,9 +738,9 @@
// }
//}
-func WithClientSticky(sticky bool) ClientOption {
+func WithClientSticky() ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Sticky = sticky
+ opts.overallReference.Sticky = true
}
}
@@ -700,13 +778,13 @@
func WithClientForceTag() ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.ForceTag = true
+ opts.overallReference.ForceTag = true
}
}
func WithClientMeshProviderPort(port int) ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.MeshProviderPort = port
+ opts.overallReference.MeshProviderPort = port
}
}
@@ -750,8 +828,6 @@
type CallOptions struct {
RequestTimeout string
Retries string
- Group string
- Version string
}
type CallOption func(*CallOptions)
@@ -773,15 +849,3 @@
opts.Retries = strconv.Itoa(retries)
}
}
-
-func WithCallGroup(group string) CallOption {
- return func(opts *CallOptions) {
- opts.Group = group
- }
-}
-
-func WithCallVersion(version string) CallOption {
- return func(opts *CallOptions) {
- opts.Version = version
- }
-}
diff --git a/client/options_test.go b/client/options_test.go
index 0016b91..c2edacc 100644
--- a/client/options_test.go
+++ b/client/options_test.go
@@ -17,37 +17,1179 @@
package client
-//func TestWithURL(t *testing.T) {
-// tests := []struct {
-// opts []ClientOption
-// justify func(t *testing.T, opts *ClientOptions)
-// }{
+import (
+ "testing"
+ "time"
+)
+
+import (
+ "github.com/stretchr/testify/assert"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/registry"
+)
+
+type newClientCase struct {
+ desc string
+ opts []ClientOption
+ verify func(t *testing.T, cli *Client, err error)
+}
+
+func processNewClientCases(t *testing.T, cases []newClientCase) {
+ for _, c := range cases {
+ t.Run(c.desc, func(t *testing.T) {
+ cli, err := NewClient(c.opts...)
+ c.verify(t, cli, err)
+ })
+ }
+}
+
+// ---------- ClientOption Testing ----------
+
+// todo: verify
+func TestWithClientURL(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "normal address",
+ opts: []ClientOption{
+ WithClientURL("127.0.0.1:20000"),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "127.0.0.1:20000", cli.cliOpts.overallReference.URL)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientCheck(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "config check",
+ opts: []ClientOption{
+ WithClientCheck(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, true, cli.cliOpts.Consumer.Check)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientFilter(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "config normal filter",
+ opts: []ClientOption{
+ WithClientFilter("test_filter"),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "test_filter", cli.cliOpts.Consumer.Filter)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientRegistryIDs(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "config normal ids",
+ opts: []ClientOption{
+ WithClientRegistryIDs("zk", "nacos"),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, []string{"zk", "nacos"}, cli.cliOpts.Consumer.RegistryIDs)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientRegistry(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "config registry without setting id explicitly",
+ opts: []ClientOption{
+ WithClientRegistry(
+ registry.WithZookeeper(),
+ registry.WithAddress("127.0.0.1:2181"),
+ ),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ reg, ok := cli.cliOpts.Registries[constant.ZookeeperKey]
+ assert.True(t, ok)
+ assert.Equal(t, "127.0.0.1:2181", reg.Address)
+ regCompat, ok := cli.cliOpts.registriesCompat[constant.ZookeeperKey]
+ assert.True(t, ok)
+ assert.Equal(t, "127.0.0.1:2181", regCompat.Address)
+ },
+ },
+ {
+ desc: "config registry without setting id",
+ opts: []ClientOption{
+ WithClientRegistry(
+ registry.WithID("zk"),
+ registry.WithZookeeper(),
+ registry.WithAddress("127.0.0.1:2181"),
+ ),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ reg, ok := cli.cliOpts.Registries["zk"]
+ assert.True(t, ok)
+ assert.Equal(t, "127.0.0.1:2181", reg.Address)
+ regCompat, ok := cli.cliOpts.registriesCompat["zk"]
+ assert.True(t, ok)
+ assert.Equal(t, "127.0.0.1:2181", regCompat.Address)
+ },
+ },
+ {
+ desc: "config multiple registries with setting RegistryIds",
+ opts: []ClientOption{
+ WithClientRegistry(
+ registry.WithZookeeper(),
+ registry.WithAddress("127.0.0.1:2181"),
+ ),
+ WithClientRegistry(
+ registry.WithID("nacos_test"),
+ registry.WithNacos(),
+ registry.WithAddress("127.0.0.1:8848"),
+ ),
+ WithClientRegistryIDs("nacos_test"),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ zkReg, ok := cli.cliOpts.Registries[constant.ZookeeperKey]
+ assert.True(t, ok)
+ assert.Equal(t, "127.0.0.1:2181", zkReg.Address)
+ ncReg, ok := cli.cliOpts.Registries["nacos_test"]
+ assert.True(t, ok)
+ assert.Equal(t, "127.0.0.1:8848", ncReg.Address)
+
+ _, ok = cli.cliOpts.registriesCompat[constant.ZookeeperKey]
+ assert.False(t, ok)
+ ncCompat, ok := cli.cliOpts.registriesCompat["nacos_test"]
+ assert.True(t, ok)
+ assert.Equal(t, "127.0.0.1:8848", ncCompat.Address)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientShutdown(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "config shutdown",
+ opts: []ClientOption{
+ WithClientShutdown(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ // we do not verify the internal fields of Shutdown since graceful_shutdown module is in charge of it
+ assert.NotNil(t, cli.cliOpts.Shutdown)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientCluster(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "default Cluster strategy",
+ opts: []ClientOption{},
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyFailover, cli.cliOpts.overallReference.Cluster)
+ },
+ },
+ {
+ desc: "config Available Cluster strategy",
+ opts: []ClientOption{
+ WithClientClusterAvailable(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyAvailable, cli.cliOpts.overallReference.Cluster)
+ },
+ },
+ {
+ desc: "config Broadcast Cluster strategy",
+ opts: []ClientOption{
+ WithClientClusterBroadcast(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyBroadcast, cli.cliOpts.overallReference.Cluster)
+ },
+ },
+ {
+ desc: "config FailBack Cluster strategy",
+ opts: []ClientOption{
+ WithClientClusterFailBack(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyFailback, cli.cliOpts.overallReference.Cluster)
+ },
+ },
+ {
+ desc: "config FailFast Cluster strategy",
+ opts: []ClientOption{
+ WithClientClusterFailFast(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyFailfast, cli.cliOpts.overallReference.Cluster)
+ },
+ },
+ {
+ desc: "config FailOver Cluster strategy",
+ opts: []ClientOption{
+ WithClientClusterFailOver(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyFailover, cli.cliOpts.overallReference.Cluster)
+ },
+ },
+ {
+ desc: "config FailSafe Cluster strategy",
+ opts: []ClientOption{
+ WithClientClusterFailSafe(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyFailsafe, cli.cliOpts.overallReference.Cluster)
+ },
+ },
+ {
+ desc: "config Forking Cluster strategy",
+ opts: []ClientOption{
+ WithClientClusterForking(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyForking, cli.cliOpts.overallReference.Cluster)
+ },
+ },
+ {
+ desc: "config ZoneAware Cluster strategy",
+ opts: []ClientOption{
+ WithClientClusterZoneAware(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyZoneAware, cli.cliOpts.overallReference.Cluster)
+ },
+ },
+ {
+ desc: "config AdaptiveService Cluster strategy",
+ opts: []ClientOption{
+ WithClientClusterAdaptiveService(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyAdaptiveService, cli.cliOpts.overallReference.Cluster)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientLoadBalance(t *testing.T) {
+ cases := []newClientCase{
+ // todo(DMwangnima): think about default loadbalance strategy
+ //{
+ // desc: "default Cluster strategy",
+ // opts: []ClientOption{},
+ // verify: func(t *testing.T, cli *Client, err error) {
+ // assert.Nil(t, err)
+ // assert.Equal(t, constant.ClusterKeyFailover, cli.cliOpts.overallReference.Cluster)
+ // },
+ //},
+ {
+ desc: "config ConsistentHashing LoadBalance strategy",
+ opts: []ClientOption{
+ WithClientLoadBalanceConsistentHashing(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.LoadBalanceKeyConsistentHashing, cli.cliOpts.overallReference.Loadbalance)
+ },
+ },
+ {
+ desc: "config LeastActive LoadBalance strategy",
+ opts: []ClientOption{
+ WithClientLoadBalanceLeastActive(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.LoadBalanceKeyLeastActive, cli.cliOpts.overallReference.Loadbalance)
+ },
+ },
+ {
+ desc: "config Random LoadBalance strategy",
+ opts: []ClientOption{
+ WithClientLoadBalanceRandom(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.LoadBalanceKeyRandom, cli.cliOpts.overallReference.Loadbalance)
+ },
+ },
+ {
+ desc: "config RoundRobin LoadBalance strategy",
+ opts: []ClientOption{
+ WithClientLoadBalanceRoundRobin(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.LoadBalanceKeyRoundRobin, cli.cliOpts.overallReference.Loadbalance)
+ },
+ },
+ {
+ desc: "config P2C LoadBalance strategy",
+ opts: []ClientOption{
+ WithClientLoadBalanceP2C(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.LoadBalanceKeyP2C, cli.cliOpts.overallReference.Loadbalance)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientRetries(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "config normal retries",
+ opts: []ClientOption{
+ WithClientRetries(3),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "3", cli.cliOpts.overallReference.Retries)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientGroup(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "config normal group",
+ opts: []ClientOption{
+ WithClientGroup("test_group"),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "test_group", cli.cliOpts.overallReference.Group)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientVersion(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "config normal version",
+ opts: []ClientOption{
+ WithClientVersion("test_version"),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "test_version", cli.cliOpts.overallReference.Version)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientSerialization(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "default Serialization",
+ opts: []ClientOption{},
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ProtobufSerialization, cli.cliOpts.overallReference.Serialization)
+ },
+ },
+ {
+ desc: "config JSON Serialization",
+ opts: []ClientOption{
+ WithClientSerializationJSON(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.JSONSerialization, cli.cliOpts.overallReference.Serialization)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientProvidedBy(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "config normal ProvidedBy",
+ opts: []ClientOption{
+ WithClientProvidedBy("test_instance"),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "test_instance", cli.cliOpts.overallReference.ProvidedBy)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientParams(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "config normal params",
+ opts: []ClientOption{
+ WithClientParams(map[string]string{
+ "test_key": "test_val",
+ }),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, map[string]string{"test_key": "test_val"}, cli.cliOpts.overallReference.Params)
+ },
+ },
+ {
+ desc: "config nil params",
+ opts: []ClientOption{
+ WithClientParams(nil),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Nil(t, cli.cliOpts.overallReference.Params)
+ },
+ },
+ {
+ desc: "config nil params with type information",
+ opts: []ClientOption{
+ WithClientParams((map[string]string)(nil)),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Nil(t, cli.cliOpts.overallReference.Params)
+ },
+ },
+ {
+ desc: "config params without key-val",
+ opts: []ClientOption{
+ WithClientParams(map[string]string{}),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Nil(t, cli.cliOpts.overallReference.Params)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientParam(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "config normal param",
+ opts: []ClientOption{
+ WithClientParam("test_key", "test_val"),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, map[string]string{"test_key": "test_val"}, cli.cliOpts.overallReference.Params)
+ },
+ },
+ {
+ desc: "config normal param multiple times",
+ opts: []ClientOption{
+ WithClientParam("test_key", "test_val"),
+ WithClientParam("test_key1", "test_val1"),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, map[string]string{"test_key": "test_val", "test_key1": "test_val1"}, cli.cliOpts.overallReference.Params)
+ },
+ },
+ {
+ desc: "config param with empty key",
+ opts: []ClientOption{
+ WithClientParam("", ""),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, map[string]string{"": ""}, cli.cliOpts.overallReference.Params)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientSticky(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "config sticky",
+ opts: []ClientOption{
+ WithClientSticky(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.True(t, cli.cliOpts.overallReference.Sticky)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientProtocol(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "default Protocol",
+ opts: []ClientOption{},
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "tri", cli.cliOpts.Consumer.Protocol)
+ },
+ },
+ {
+ desc: "config Dubbo Protocol",
+ opts: []ClientOption{
+ WithClientProtocolDubbo(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.Dubbo, cli.cliOpts.Consumer.Protocol)
+ },
+ },
+ {
+ desc: "config Triple Protocol",
+ opts: []ClientOption{
+ WithClientProtocolTriple(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "tri", cli.cliOpts.Consumer.Protocol)
+ },
+ },
+ {
+ desc: "config JsonRPC Protocol",
+ opts: []ClientOption{
+ WithClientProtocolJsonRPC(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "jsonrpc", cli.cliOpts.Consumer.Protocol)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientRequestTimeout(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "config normal RequestTimeout",
+ opts: []ClientOption{
+ WithClientRequestTimeout(6 * time.Second),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "6s", cli.cliOpts.Consumer.RequestTimeout)
+ },
+ },
+ // todo(DMwangnima): consider whether this default timeout is ideal
+ {
+ desc: "default RequestTimeout",
+ opts: []ClientOption{},
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "3s", cli.cliOpts.Consumer.RequestTimeout)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientForceTag(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "config ForceTag",
+ opts: []ClientOption{
+ WithClientForceTag(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.True(t, cli.cliOpts.overallReference.ForceTag)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientMeshProviderPort(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "config normal MeshProviderPort",
+ opts: []ClientOption{
+ WithClientMeshProviderPort(20001),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, 20001, cli.cliOpts.overallReference.MeshProviderPort)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+// ---------- ReferenceOption Testing ----------
+
+type referenceOptionsInitCase struct {
+ desc string
+ opts []ReferenceOption
+ verify func(t *testing.T, refOpts *ReferenceOptions, err error)
+}
+
+func processReferenceOptionsInitCases(t *testing.T, cases []referenceOptionsInitCase) {
+ for _, c := range cases {
+ t.Run(c.desc, func(t *testing.T) {
+ defRefOpts := defaultReferenceOptions()
+ err := defRefOpts.init(c.opts...)
+ c.verify(t, defRefOpts, err)
+ })
+ }
+}
+
+func TestWithCheck(t *testing.T) {
+ cases := []referenceOptionsInitCase{
+ {
+ desc: "config Check",
+ opts: []ReferenceOption{
+ WithCheck(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.True(t, *refOpts.Reference.Check)
+ },
+ },
+ }
+ processReferenceOptionsInitCases(t, cases)
+}
+
+func TestWithURL(t *testing.T) {
+ cases := []referenceOptionsInitCase{
+ {
+ desc: "config normal URL",
+ opts: []ReferenceOption{
+ WithURL("127.0.0.1:20000"),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "127.0.0.1:20000", refOpts.Reference.URL)
+ },
+ },
+ }
+ processReferenceOptionsInitCases(t, cases)
+}
+
+func TestWithFilter(t *testing.T) {
+ cases := []referenceOptionsInitCase{
+ {
+ desc: "config normal filter",
+ opts: []ReferenceOption{
+ WithFilter("test_filter"),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "test_filter", refOpts.Reference.Filter)
+ },
+ },
+ }
+ processReferenceOptionsInitCases(t, cases)
+}
+
+func TestWithRegistryIDs(t *testing.T) {
+ cases := []referenceOptionsInitCase{
+ {
+ desc: "config normal ids",
+ opts: []ReferenceOption{
+ WithRegistryIDs("zk", "nacos"),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, []string{"zk", "nacos"}, refOpts.Reference.RegistryIDs)
+ },
+ },
+ }
+ processReferenceOptionsInitCases(t, cases)
+}
+
+func TestWithCluster(t *testing.T) {
+ cases := []referenceOptionsInitCase{
+ {
+ desc: "default Cluster strategy",
+ opts: []ReferenceOption{},
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyFailover, refOpts.Reference.Cluster)
+ },
+ },
+ {
+ desc: "config Available Cluster strategy",
+ opts: []ReferenceOption{
+ WithClusterAvailable(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyAvailable, refOpts.Reference.Cluster)
+ },
+ },
+ {
+ desc: "config Broadcast Cluster strategy",
+ opts: []ReferenceOption{
+ WithClusterBroadcast(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyBroadcast, refOpts.Reference.Cluster)
+ },
+ },
+ {
+ desc: "config FailBack Cluster strategy",
+ opts: []ReferenceOption{
+ WithClusterFailBack(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyFailback, refOpts.Reference.Cluster)
+ },
+ },
+ {
+ desc: "config FailFast Cluster strategy",
+ opts: []ReferenceOption{
+ WithClusterFailFast(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyFailfast, refOpts.Reference.Cluster)
+ },
+ },
+ {
+ desc: "config FailOver Cluster strategy",
+ opts: []ReferenceOption{
+ WithClusterFailOver(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyFailover, refOpts.Reference.Cluster)
+ },
+ },
+ {
+ desc: "config FailSafe Cluster strategy",
+ opts: []ReferenceOption{
+ WithClusterFailSafe(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyFailsafe, refOpts.Reference.Cluster)
+ },
+ },
+ {
+ desc: "config Forking Cluster strategy",
+ opts: []ReferenceOption{
+ WithClusterForking(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyForking, refOpts.Reference.Cluster)
+ },
+ },
+ {
+ desc: "config ZoneAware Cluster strategy",
+ opts: []ReferenceOption{
+ WithClusterZoneAware(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyZoneAware, refOpts.Reference.Cluster)
+ },
+ },
+ {
+ desc: "config AdaptiveService Cluster strategy",
+ opts: []ReferenceOption{
+ WithClusterAdaptiveService(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyAdaptiveService, refOpts.Reference.Cluster)
+ },
+ },
+ }
+ processReferenceOptionsInitCases(t, cases)
+}
+
+func TestWithLoadBalance(t *testing.T) {
+ cases := []referenceOptionsInitCase{
+ // todo(DMwangnima): think about default loadbalance strategy
+ //{
+ // desc: "default Cluster strategy",
+ // opts: []ClientOption{},
+ // verify: func(t *testing.T, cli *Client, err error) {
+ // assert.Nil(t, err)
+ // assert.Equal(t, constant.ClusterKeyFailover, cli.cliOpts.overallReference.Cluster)
+ // },
+ //},
+ {
+ desc: "config ConsistentHashing LoadBalance strategy",
+ opts: []ReferenceOption{
+ WithLoadBalanceConsistentHashing(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.LoadBalanceKeyConsistentHashing, refOpts.Reference.Loadbalance)
+ },
+ },
+ {
+ desc: "config LeastActive LoadBalance strategy",
+ opts: []ReferenceOption{
+ WithLoadBalanceLeastActive(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.LoadBalanceKeyLeastActive, refOpts.Reference.Loadbalance)
+ },
+ },
+ {
+ desc: "config Random LoadBalance strategy",
+ opts: []ReferenceOption{
+ WithLoadBalanceRandom(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.LoadBalanceKeyRandom, refOpts.Reference.Loadbalance)
+ },
+ },
+ {
+ desc: "config RoundRobin LoadBalance strategy",
+ opts: []ReferenceOption{
+ WithLoadBalanceRoundRobin(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.LoadBalanceKeyRoundRobin, refOpts.Reference.Loadbalance)
+ },
+ },
+ {
+ desc: "config P2C LoadBalance strategy",
+ opts: []ReferenceOption{
+ WithLoadBalanceP2C(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.LoadBalanceKeyP2C, refOpts.Reference.Loadbalance)
+ },
+ },
+ }
+ processReferenceOptionsInitCases(t, cases)
+}
+
+func TestWithRetries(t *testing.T) {
+ cases := []referenceOptionsInitCase{
+ {
+ desc: "config normal retries",
+ opts: []ReferenceOption{
+ WithRetries(3),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "3", refOpts.Reference.Retries)
+ },
+ },
+ }
+ processReferenceOptionsInitCases(t, cases)
+}
+
+func TestWithGroup(t *testing.T) {
+ cases := []referenceOptionsInitCase{
+ {
+ desc: "config normal group",
+ opts: []ReferenceOption{
+ WithGroup("test_group"),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "test_group", refOpts.Reference.Group)
+ },
+ },
+ }
+ processReferenceOptionsInitCases(t, cases)
+}
+
+func TestWithVersion(t *testing.T) {
+ cases := []referenceOptionsInitCase{
+ {
+ desc: "config normal version",
+ opts: []ReferenceOption{
+ WithVersion("test_version"),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "test_version", refOpts.Reference.Version)
+ },
+ },
+ }
+ processReferenceOptionsInitCases(t, cases)
+}
+
+func TestWithSerialization(t *testing.T) {
+ cases := []referenceOptionsInitCase{
+ {
+ desc: "default Serialization",
+ opts: []ReferenceOption{},
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ProtobufSerialization, refOpts.Reference.Serialization)
+ },
+ },
+ {
+ desc: "config JSON Serialization",
+ opts: []ReferenceOption{
+ WithSerializationJSON(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.JSONSerialization, refOpts.Reference.Serialization)
+ },
+ },
+ }
+ processReferenceOptionsInitCases(t, cases)
+}
+
+func TestWithProvidedBy(t *testing.T) {
+ cases := []referenceOptionsInitCase{
+ {
+ desc: "config normal ProvidedBy",
+ opts: []ReferenceOption{
+ WithProvidedBy("test_instance"),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "test_instance", refOpts.Reference.ProvidedBy)
+ },
+ },
+ }
+ processReferenceOptionsInitCases(t, cases)
+}
+
+func TestWithParams(t *testing.T) {
+ cases := []referenceOptionsInitCase{
+ {
+ desc: "config normal params",
+ opts: []ReferenceOption{
+ WithParams(map[string]string{
+ "test_key": "test_val",
+ }),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, map[string]string{"test_key": "test_val"}, refOpts.Reference.Params)
+ },
+ },
+ {
+ desc: "config nil params",
+ opts: []ReferenceOption{
+ WithParams(nil),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.Nil(t, refOpts.Reference.Params)
+ },
+ },
+ {
+ desc: "config nil params with type information",
+ opts: []ReferenceOption{
+ WithParams((map[string]string)(nil)),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.Nil(t, refOpts.Reference.Params)
+ },
+ },
+ {
+ desc: "config params without key-val",
+ opts: []ReferenceOption{
+ WithParams(map[string]string{}),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.Nil(t, refOpts.Reference.Params)
+ },
+ },
+ }
+ processReferenceOptionsInitCases(t, cases)
+}
+
+//func TestWithClientParam(t *testing.T) {
+// cases := []newClientCase{
// {
+// desc: "config normal param",
// opts: []ClientOption{
-// WithClientURL("127.0.0.1:20000"),
+// WithClientParam("test_key", "test_val"),
// },
-// justify: func(t *testing.T, opts *ClientOptions) {
-// urls := opts.urls
-// assert.Equal(t, 1, len(urls))
-// assert.Equal(t, "tri", urls[0].Protocol)
+// verify: func(t *testing.T, cli *Client, err error) {
+// assert.Nil(t, err)
+// assert.Equal(t, map[string]string{"test_key": "test_val"}, cli.cliOpts.overallReference.Params)
// },
// },
// {
+// desc: "config normal param multiple times",
// opts: []ClientOption{
-// WithClientURL("tri://127.0.0.1:20000"),
+// WithClientParam("test_key", "test_val"),
+// WithClientParam("test_key1", "test_val1"),
// },
-// justify: func(t *testing.T, opts *ClientOptions) {
-// urls := opts.urls
-// assert.Equal(t, 1, len(urls))
-// assert.Equal(t, "tri", urls[0].Protocol)
+// verify: func(t *testing.T, cli *Client, err error) {
+// assert.Nil(t, err)
+// assert.Equal(t, map[string]string{"test_key": "test_val", "test_key1": "test_val1"}, cli.cliOpts.overallReference.Params)
+// },
+// },
+// {
+// desc: "config param with empty key",
+// opts: []ClientOption{
+// WithClientParam("", ""),
+// },
+// verify: func(t *testing.T, cli *Client, err error) {
+// assert.Nil(t, err)
+// assert.Equal(t, map[string]string{"": ""}, cli.cliOpts.overallReference.Params)
// },
// },
// }
-//
-// for _, test := range tests {
-// newOpts := defaultClientOptions()
-// assert.Nil(t, newOpts.init(test.opts...))
-// assert.Nil(t, newOpts.processURL(&common.URL{}))
-// test.justify(t, newOpts)
-// }
+// processNewClientCases(t, cases)
//}
+
+func TestWithSticky(t *testing.T) {
+ cases := []referenceOptionsInitCase{
+ {
+ desc: "config sticky",
+ opts: []ReferenceOption{
+ WithSticky(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.True(t, refOpts.Reference.Sticky)
+ },
+ },
+ }
+ processReferenceOptionsInitCases(t, cases)
+}
+
+func TestWithProtocol(t *testing.T) {
+ cases := []referenceOptionsInitCase{
+ {
+ desc: "default Protocol",
+ opts: []ReferenceOption{},
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "tri", refOpts.Reference.Protocol)
+ },
+ },
+ {
+ desc: "config Dubbo Protocol",
+ opts: []ReferenceOption{
+ WithProtocolDubbo(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.Dubbo, refOpts.Reference.Protocol)
+ },
+ },
+ {
+ desc: "config Triple Protocol",
+ opts: []ReferenceOption{
+ WithProtocolTriple(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "tri", refOpts.Reference.Protocol)
+ },
+ },
+ {
+ desc: "config JsonRPC Protocol",
+ opts: []ReferenceOption{
+ WithProtocolJsonRPC(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "jsonrpc", refOpts.Reference.Protocol)
+ },
+ },
+ }
+ processReferenceOptionsInitCases(t, cases)
+}
+
+func TestWithRequestTimeout(t *testing.T) {
+ cases := []referenceOptionsInitCase{
+ {
+ desc: "config normal RequestTimeout",
+ opts: []ReferenceOption{
+ WithRequestTimeout(6 * time.Second),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "6s", refOpts.Reference.RequestTimeout)
+ },
+ },
+ }
+ processReferenceOptionsInitCases(t, cases)
+}
+
+func TestWithForceTag(t *testing.T) {
+ cases := []referenceOptionsInitCase{
+ {
+ desc: "config ForceTag",
+ opts: []ReferenceOption{
+ WithForceTag(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.True(t, refOpts.Reference.ForceTag)
+ },
+ },
+ }
+ processReferenceOptionsInitCases(t, cases)
+}
+
+func TestWithMeshProviderPort(t *testing.T) {
+ cases := []referenceOptionsInitCase{
+ {
+ desc: "config normal MeshProviderPort",
+ opts: []ReferenceOption{
+ WithMeshProviderPort(20001),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, 20001, refOpts.Reference.MeshProviderPort)
+ },
+ },
+ }
+ processReferenceOptionsInitCases(t, cases)
+}
diff --git a/config/application_config.go b/config/application_config.go
index 6bdf837..c9de915 100644
--- a/config/application_config.go
+++ b/config/application_config.go
@@ -19,6 +19,7 @@
import (
"github.com/creasty/defaults"
+
"github.com/pkg/errors"
)
diff --git a/dubbo.go b/dubbo.go
index 2e6c2e1..b779b78 100644
--- a/dubbo.go
+++ b/dubbo.go
@@ -23,6 +23,7 @@
import (
"github.com/dubbogo/gost/log/logger"
+
"github.com/pkg/errors"
)
@@ -85,7 +86,7 @@
cliOpts = append(cliOpts,
client.WithClientFilter(conCfg.Filter),
// todo(DMwangnima): deal with Protocol
- client.WithClientRegistryIDs(conCfg.RegistryIDs),
+ client.WithClientRegistryIDs(conCfg.RegistryIDs...),
// todo(DMwangnima): deal with TracingKey
client.SetClientConsumer(conCfg),
)
@@ -206,7 +207,7 @@
logger.Fatalf("Failed to start server, err: %v", err)
}
}()
- return err
+ return nil
}
// loadConsumer loads the service consumer.
@@ -218,13 +219,14 @@
// refer services
conLock.RLock()
defer conLock.RUnlock()
- for _, definition := range consumerServices {
- if _, _, err = cli.Init(definition.Info); err != nil {
- return err
+ for intfName, definition := range consumerServices {
+ conn, dialErr := cli.DialWithInfo(intfName, definition.Info)
+ if dialErr != nil {
+ return dialErr
}
- definition.Info.ClientInjectFunc(definition.Svc, cli)
+ definition.Info.ConnectionInjectFunc(definition.Svc, conn)
}
- return err
+ return nil
}
// SetConsumerServiceWithInfo sets the consumer service with the client information.
diff --git a/global/consumer_config.go b/global/consumer_config.go
index d59bc63..ee7c21e 100644
--- a/global/consumer_config.go
+++ b/global/consumer_config.go
@@ -18,7 +18,6 @@
package global
type ConsumerConfig struct {
- ReferenceConfig
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
RegistryIDs []string `yaml:"registry-ids" json:"registry-ids,omitempty" property:"registry-ids"`
Protocol string `yaml:"protocol" json:"protocol,omitempty" property:"protocol"`
@@ -27,16 +26,17 @@
Check bool `yaml:"check" json:"check,omitempty" property:"check"`
AdaptiveService bool `default:"false" yaml:"adaptive-service" json:"adaptive-service" property:"adaptive-service"`
// there is no need to configure References, it will be replaced by instance.NewClient
- //References map[string]*client.ReferenceConfig `yaml:"references" json:"references,omitempty" property:"references"`
- TracingKey string `yaml:"tracing-key" json:"tracing-key" property:"tracing-key"`
- FilterConf interface{} `yaml:"filter-conf" json:"filter-conf,omitempty" property:"filter-conf"`
- MaxWaitTimeForServiceDiscovery string `default:"3s" yaml:"max-wait-time-for-service-discovery" json:"max-wait-time-for-service-discovery,omitempty" property:"max-wait-time-for-service-discovery"`
- MeshEnabled bool `yaml:"mesh-enabled" json:"mesh-enabled,omitempty" property:"mesh-enabled"`
+ References map[string]*ReferenceConfig `yaml:"references" json:"references,omitempty" property:"references"`
+ TracingKey string `yaml:"tracing-key" json:"tracing-key" property:"tracing-key"`
+ FilterConf interface{} `yaml:"filter-conf" json:"filter-conf,omitempty" property:"filter-conf"`
+ MaxWaitTimeForServiceDiscovery string `default:"3s" yaml:"max-wait-time-for-service-discovery" json:"max-wait-time-for-service-discovery,omitempty" property:"max-wait-time-for-service-discovery"`
+ MeshEnabled bool `yaml:"mesh-enabled" json:"mesh-enabled,omitempty" property:"mesh-enabled"`
}
func DefaultConsumerConfig() *ConsumerConfig {
return &ConsumerConfig{
RequestTimeout: "3s",
Check: true,
+ References: make(map[string]*ReferenceConfig),
}
}
diff --git a/global/reference_config.go b/global/reference_config.go
index 94070fd..5acb642 100644
--- a/global/reference_config.go
+++ b/global/reference_config.go
@@ -50,9 +50,9 @@
func DefaultReferenceConfig() *ReferenceConfig {
return &ReferenceConfig{
// use Triple protocol by default
- Protocol: "tri",
- Methods: make([]*MethodConfig, 0, 8),
- Params: make(map[string]string, 8),
+ //Protocol: "tri",
+ Methods: make([]*MethodConfig, 0, 8),
+ //Params: make(map[string]string, 8),
}
}
diff --git a/protocol/dubbo/dubbo_codec.go b/protocol/dubbo/dubbo_codec.go
index 92f7132..ec7cbfd 100644
--- a/protocol/dubbo/dubbo_codec.go
+++ b/protocol/dubbo/dubbo_codec.go
@@ -19,7 +19,6 @@
import (
"bytes"
- "strconv"
"time"
)
@@ -70,11 +69,12 @@
svc.Version = invocation.GetAttachmentWithDefaultValue(constant.VersionKey, "")
svc.Group = invocation.GetAttachmentWithDefaultValue(constant.GroupKey, "")
svc.Method = invocation.MethodName()
- timeout, err := strconv.Atoi(invocation.GetAttachmentWithDefaultValue(constant.TimeoutKey, strconv.Itoa(constant.DefaultRemotingTimeout)))
- if err != nil {
- // it will be wrapped in readwrite.Write .
- return nil, perrors.WithStack(err)
- }
+ //timeout, err := strconv.Atoi(invocation.GetAttachmentWithDefaultValue(constant.TimeoutKey, strconv.Itoa(constant.DefaultRemotingTimeout)))
+ timeout := 300000
+ //if err != nil {
+ // // it will be wrapped in readwrite.Write .
+ // return nil, perrors.WithStack(err)
+ //}
svc.Timeout = time.Duration(timeout)
header := impl.DubboHeader{}
diff --git a/protocol/dubbo/example/new/client/main.go b/protocol/dubbo/example/new/client/main.go
new file mode 100644
index 0000000..016d635
--- /dev/null
+++ b/protocol/dubbo/example/new/client/main.go
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+import (
+ "context"
+)
+
+import (
+ "github.com/dubbogo/gost/log/logger"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/client"
+ _ "dubbo.apache.org/dubbo-go/v3/imports"
+)
+
+func main() {
+ cli, err := client.NewClient(
+ client.WithClientProtocolDubbo(),
+ )
+ if err != nil {
+ panic(err)
+ }
+ conn, err := cli.Dial("GreetProvider",
+ client.WithURL("127.0.0.1:20000"),
+ )
+ if err != nil {
+ panic(err)
+ }
+ var resp string
+ if err := conn.CallUnary(context.Background(), []interface{}{"hello", "new", "dubbo"}, &resp, "Greet"); err != nil {
+ logger.Errorf("GreetProvider.Greet err: %s", err)
+ return
+ }
+ logger.Infof("Get Response: %s", resp)
+}
diff --git a/protocol/dubbo/example/new/server/main.go b/protocol/dubbo/example/new/server/main.go
new file mode 100644
index 0000000..19620f0
--- /dev/null
+++ b/protocol/dubbo/example/new/server/main.go
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+import (
+ _ "dubbo.apache.org/dubbo-go/v3/imports"
+ "dubbo.apache.org/dubbo-go/v3/protocol"
+ "dubbo.apache.org/dubbo-go/v3/server"
+)
+
+type GreetProvider struct {
+}
+
+func (*GreetProvider) Greet(req string, req1 string, req2 string) (string, error) {
+ return req + req1 + req2, nil
+}
+
+func main() {
+ srv, err := server.NewServer(
+ server.WithServerProtocol(
+ protocol.WithDubbo(),
+ protocol.WithPort(20000),
+ ),
+ )
+ if err != nil {
+ panic(err)
+ }
+ if err := srv.Register(&GreetProvider{}, nil, server.WithInterface("GreetProvider")); err != nil {
+ panic(err)
+ }
+ if err := srv.Serve(); err != nil {
+ panic(err)
+ }
+}
diff --git a/protocol/triple/health/triple_health/health.triple.go b/protocol/triple/health/triple_health/health.triple.go
index 79de164..908737f 100644
--- a/protocol/triple/health/triple_health/health.triple.go
+++ b/protocol/triple/health/triple_health/health.triple.go
@@ -74,37 +74,31 @@
// NewHealth constructs a client for the grpc.health.v1.Health service.
func NewHealth(cli *client.Client, opts ...client.ReferenceOption) (Health, error) {
- group, version, err := cli.Init(&Health_ClientInfo, opts...)
+ conn, err := cli.Dial("grpc.health.v1.Health", opts...)
if err != nil {
return nil, err
}
return &HealthImpl{
- cli: cli,
- group: group,
- version: version,
+ conn: conn,
}, nil
}
// HealthImpl implements Health.
type HealthImpl struct {
- cli *client.Client
- group string
- version string
+ conn *client.Connection
}
func (c *HealthImpl) Check(ctx context.Context, req *HealthCheckRequest, opts ...client.CallOption) (*HealthCheckResponse, error) {
- opts = appendGroupVersion(opts, c)
resp := new(HealthCheckResponse)
- if err := c.cli.CallUnary(ctx, req, resp, "grpc.health.v1.Health", "Check", opts...); err != nil {
+ if err := c.conn.CallUnary(ctx, []interface{}{req}, resp, "Check", opts...); err != nil {
return nil, err
}
return resp, nil
}
func (c *HealthImpl) Watch(ctx context.Context, req *HealthCheckRequest, opts ...client.CallOption) (Health_WatchClient, error) {
- opts = appendGroupVersion(opts, c)
- stream, err := c.cli.CallServerStream(ctx, req, "grpc.health.v1.Health", "Watch", opts...)
+ stream, err := c.conn.CallServerStream(ctx, req, "Watch", opts...)
if err != nil {
return nil, err
}
@@ -112,12 +106,6 @@
return &HealthWatchClient{rawStream}, nil
}
-func appendGroupVersion(opts []client.CallOption, c *HealthImpl) []client.CallOption {
- opts = append(opts, client.WithCallGroup(c.group))
- opts = append(opts, client.WithCallVersion(c.version))
- return opts
-}
-
type Health_WatchClient interface {
Recv() bool
ResponseHeader() http.Header
@@ -152,9 +140,9 @@
var Health_ClientInfo = client.ClientInfo{
InterfaceName: "grpc.health.v1.Health",
MethodNames: []string{"Check", "Watch"},
- ClientInjectFunc: func(dubboCliRaw interface{}, cli *client.Client) {
+ ConnectionInjectFunc: func(dubboCliRaw interface{}, conn *client.Connection) {
dubboCli := dubboCliRaw.(HealthImpl)
- dubboCli.cli = cli
+ dubboCli.conn = conn
},
}
diff --git a/protocol/triple/internal/client/cmd_instance_with_registry/main.go b/protocol/triple/internal/client/cmd_instance_with_registry/main.go
index b3e0b0d..7da6d0b 100644
--- a/protocol/triple/internal/client/cmd_instance_with_registry/main.go
+++ b/protocol/triple/internal/client/cmd_instance_with_registry/main.go
@@ -42,7 +42,7 @@
}
// configure the params that only client layer cares
cli, err := ins.NewClient(
- client.WithClientRegistryIDs([]string{"zk"}),
+ client.WithClientRegistryIDs("zk"),
)
if err != nil {
panic(err)
diff --git a/protocol/triple/internal/proto/triple_gen/greettriple/greet.triple.go b/protocol/triple/internal/proto/triple_gen/greettriple/greet.triple.go
index 45d3275..eeb15c2 100644
--- a/protocol/triple/internal/proto/triple_gen/greettriple/greet.triple.go
+++ b/protocol/triple/internal/proto/triple_gen/greettriple/greet.triple.go
@@ -90,15 +90,12 @@
// NewGreetService constructs a client for the greet.GreetService service.
func NewGreetService(cli *client.Client, opts ...client.ReferenceOption) (GreetService, error) {
- group, version, err := cli.Init(&GreetService_ClientInfo, opts...)
+ conn, err := cli.DialWithInfo("greet.GreetService", &GreetService_ClientInfo, opts...)
if err != nil {
return nil, err
}
-
return &GreetServiceImpl{
- cli: cli,
- group: group,
- version: version,
+ conn: conn,
}, nil
}
@@ -108,23 +105,19 @@
// GreetServiceImpl implements GreetService.
type GreetServiceImpl struct {
- cli *client.Client
- group string
- version string
+ conn *client.Connection
}
func (c *GreetServiceImpl) Greet(ctx context.Context, req *proto.GreetRequest, opts ...client.CallOption) (*proto.GreetResponse, error) {
resp := new(proto.GreetResponse)
- opts = appendGroupVersion(opts, c)
- if err := c.cli.CallUnary(ctx, req, resp, "greet.GreetService", "Greet", opts...); err != nil {
+ if err := c.conn.CallUnary(ctx, []interface{}{req}, resp, "Greet", opts...); err != nil {
return nil, err
}
return resp, nil
}
func (c *GreetServiceImpl) GreetStream(ctx context.Context, opts ...client.CallOption) (GreetService_GreetStreamClient, error) {
- opts = appendGroupVersion(opts, c)
- stream, err := c.cli.CallBidiStream(ctx, "greet.GreetService", "GreetStream", opts...)
+ stream, err := c.conn.CallBidiStream(ctx, "GreetStream", opts...)
if err != nil {
return nil, err
}
@@ -133,8 +126,7 @@
}
func (c *GreetServiceImpl) GreetClientStream(ctx context.Context, opts ...client.CallOption) (GreetService_GreetClientStreamClient, error) {
- opts = appendGroupVersion(opts, c)
- stream, err := c.cli.CallClientStream(ctx, "greet.GreetService", "GreetClientStream", opts...)
+ stream, err := c.conn.CallClientStream(ctx, "GreetClientStream", opts...)
if err != nil {
return nil, err
}
@@ -143,8 +135,7 @@
}
func (c *GreetServiceImpl) GreetServerStream(ctx context.Context, req *proto.GreetServerStreamRequest, opts ...client.CallOption) (GreetService_GreetServerStreamClient, error) {
- opts = appendGroupVersion(opts, c)
- stream, err := c.cli.CallServerStream(ctx, req, "greet.GreetService", "GreetServerStream", opts...)
+ stream, err := c.conn.CallServerStream(ctx, req, "GreetServerStream", opts...)
if err != nil {
return nil, err
}
@@ -152,12 +143,6 @@
return &GreetServiceGreetServerStreamClient{rawStream}, nil
}
-func appendGroupVersion(opts []client.CallOption, c *GreetServiceImpl) []client.CallOption {
- opts = append(opts, client.WithCallGroup(c.group))
- opts = append(opts, client.WithCallVersion(c.version))
- return opts
-}
-
type GreetService_GreetStreamClient interface {
Spec() triple_protocol.Spec
Peer() triple_protocol.Peer
@@ -250,9 +235,9 @@
var GreetService_ClientInfo = client.ClientInfo{
InterfaceName: "greet.GreetService",
MethodNames: []string{"Greet", "GreetStream", "GreetClientStream", "GreetServerStream"},
- ClientInjectFunc: func(dubboCliRaw interface{}, cli *client.Client) {
- dubboCli := dubboCliRaw.(*GreetServiceImpl)
- dubboCli.cli = cli
+ ConnectionInjectFunc: func(dubboCliRaw interface{}, conn *client.Connection) {
+ dubboCli := dubboCliRaw.(GreetServiceImpl)
+ dubboCli.conn = conn
},
}
diff --git a/protocol/triple/triple_invoker.go b/protocol/triple/triple_invoker.go
index 86eeaa6..525fae6 100644
--- a/protocol/triple/triple_invoker.go
+++ b/protocol/triple/triple_invoker.go
@@ -85,6 +85,7 @@
// e.g. Client.CallUnary(... req, resp []interface, ...)
// inRaw represents req and resp, inRawLen represents 2.
inRaw := invocation.ParameterRawValues()
+ invocation.Reply()
inRawLen := len(inRaw)
method := invocation.MethodName()
// todo(DMwangnima): process headers(metadata) passed in
diff --git a/server/action.go b/server/action.go
index 46f41ac..f32b3db 100644
--- a/server/action.go
+++ b/server/action.go
@@ -133,33 +133,35 @@
}
func (svcOpts *ServiceOptions) export(info *ServiceInfo) error {
- srv := svcOpts.Service
+ svc := svcOpts.Service
if info != nil {
- srv.Interface = info.InterfaceName
+ if svc.Interface == "" {
+ svc.Interface = info.InterfaceName
+ }
svcOpts.Id = info.InterfaceName
svcOpts.info = info
}
// TODO: delay needExport
if svcOpts.unexported != nil && svcOpts.unexported.Load() {
- err := perrors.Errorf("The service %v has already unexported!", srv.Interface)
+ err := perrors.Errorf("The service %v has already unexported!", svc.Interface)
logger.Errorf(err.Error())
return err
}
if svcOpts.exported != nil && svcOpts.exported.Load() {
- logger.Warnf("The service %v has already exported!", srv.Interface)
+ logger.Warnf("The service %v has already exported!", svc.Interface)
return nil
}
regUrls := make([]*common.URL, 0)
- if !srv.NotRegister {
- regUrls = config.LoadRegistries(srv.RegistryIDs, svcOpts.registriesCompat, common.PROVIDER)
+ if !svc.NotRegister {
+ regUrls = config.LoadRegistries(svc.RegistryIDs, svcOpts.registriesCompat, common.PROVIDER)
}
urlMap := svcOpts.getUrlMap()
- protocolConfigs := loadProtocol(srv.ProtocolIDs, svcOpts.protocolsCompat)
+ protocolConfigs := loadProtocol(svc.ProtocolIDs, svcOpts.protocolsCompat)
if len(protocolConfigs) == 0 {
- logger.Warnf("The service %v'svcOpts '%v' protocols don't has right protocolConfigs, Please check your configuration center and transfer protocol ", srv.Interface, srv.ProtocolIDs)
+ logger.Warnf("The service %v'svcOpts '%v' protocols don't has right protocolConfigs, Please check your configuration center and transfer protocol ", svc.Interface, svc.ProtocolIDs)
return nil
}
@@ -173,10 +175,10 @@
// todo(DMwangnimg): finish replacing procedure
// registry the service reflect
- methods, err := common.ServiceMap.Register(srv.Interface, proto.Name, srv.Group, srv.Version, svcOpts.rpcService)
+ methods, err := common.ServiceMap.Register(svc.Interface, proto.Name, svc.Group, svc.Version, svcOpts.rpcService)
if err != nil {
formatErr := perrors.Errorf("The service %v needExport the protocol %v error! Error message is %v.",
- srv.Interface, proto.Name, err.Error())
+ svc.Interface, proto.Name, err.Error())
logger.Errorf(formatErr.Error())
return formatErr
}
@@ -187,7 +189,7 @@
nextPort = nextPort.Next()
}
ivkURL := common.NewURLWithOptions(
- common.WithPath(srv.Interface),
+ common.WithPath(svc.Interface),
common.WithProtocol(proto.Name),
common.WithIp(proto.Ip),
common.WithPort(port),
@@ -197,14 +199,14 @@
common.WithMethods(strings.Split(methods, ",")),
// todo(DMwangnima): remove this
common.WithAttribute(constant.ServiceInfoKey, info),
- common.WithToken(srv.Token),
+ common.WithToken(svc.Token),
common.WithParamsValue(constant.MetadataTypeKey, svcOpts.metadataType),
// fix https://github.com/apache/dubbo-go/issues/2176
common.WithParamsValue(constant.MaxServerSendMsgSize, proto.MaxServerSendMsgSize),
common.WithParamsValue(constant.MaxServerRecvMsgSize, proto.MaxServerRecvMsgSize),
)
- if len(srv.Tag) > 0 {
- ivkURL.AddParam(constant.Tagkey, srv.Tag)
+ if len(svc.Tag) > 0 {
+ ivkURL.AddParam(constant.Tagkey, svc.Tag)
}
// post process the URL to be exported
diff --git a/server/options.go b/server/options.go
index ca84f4c..94b0db3 100644
--- a/server/options.go
+++ b/server/options.go
@@ -581,6 +581,12 @@
// ---------- For user ----------
+func WithInterface(intf string) ServiceOption {
+ return func(opts *ServiceOptions) {
+ opts.Service.Interface = intf
+ }
+}
+
// todo(DMwangnima): think about a more ideal configuration style
func WithRegistryIDs(registryIDs []string) ServiceOption {
return func(cfg *ServiceOptions) {
diff --git a/server/server.go b/server/server.go
index 2cd0150..a76d9cd 100644
--- a/server/server.go
+++ b/server/server.go
@@ -25,6 +25,8 @@
)
import (
+ "github.com/dubbogo/gost/log/logger"
+
"github.com/pkg/errors"
)
@@ -164,9 +166,16 @@
}
func (s *Server) exportServices() (err error) {
- s.svcOptsMap.Range(func(newSvcOpts, info interface{}) bool {
- err = newSvcOpts.(*ServiceOptions).ExportWithInfo(info.(*ServiceInfo))
+ s.svcOptsMap.Range(func(svcOptsRaw, infoRaw interface{}) bool {
+ svcOpts := svcOptsRaw.(*ServiceOptions)
+ if infoRaw == nil {
+ err = svcOpts.ExportWithoutInfo()
+ } else {
+ info := infoRaw.(*ServiceInfo)
+ err = svcOpts.ExportWithInfo(info)
+ }
if err != nil {
+ logger.Errorf("export %s service failed, err: %s", svcOpts.Service.Interface, err)
return false
}
return true