| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package client |
| |
| import ( |
| "fmt" |
| "net/url" |
| "os" |
| "strconv" |
| "time" |
| ) |
| |
| import ( |
| "github.com/creasty/defaults" |
| |
| "github.com/dubbogo/gost/log/logger" |
| gxstrings "github.com/dubbogo/gost/strings" |
| |
| constant2 "github.com/dubbogo/triple/pkg/common/constant" |
| ) |
| |
| import ( |
| "dubbo.apache.org/dubbo-go/v3/cluster/directory/static" |
| "dubbo.apache.org/dubbo-go/v3/common" |
| commonCfg "dubbo.apache.org/dubbo-go/v3/common/config" |
| "dubbo.apache.org/dubbo-go/v3/common/constant" |
| "dubbo.apache.org/dubbo-go/v3/common/extension" |
| "dubbo.apache.org/dubbo-go/v3/protocol" |
| "dubbo.apache.org/dubbo-go/v3/protocol/protocolwrapper" |
| "dubbo.apache.org/dubbo-go/v3/proxy" |
| "dubbo.apache.org/dubbo-go/v3/registry" |
| ) |
| |
| // ReferenceConfig is the configuration of service consumer |
| type ReferenceConfig struct { |
| pxy *proxy.Proxy |
| id string |
| InterfaceName string `yaml:"interface" json:"interface,omitempty" property:"interface"` |
| Check *bool `yaml:"check" json:"check,omitempty" property:"check"` |
| URL string `yaml:"url" json:"url,omitempty" property:"url"` |
| Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"` |
| Protocol string `yaml:"protocol" json:"protocol,omitempty" property:"protocol"` |
| RegistryIDs []string `yaml:"registry-ids" json:"registry-ids,omitempty" property:"registry-ids"` |
| Cluster string `yaml:"cluster" json:"cluster,omitempty" property:"cluster"` |
| Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"` |
| Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"` |
| Group string `yaml:"group" json:"group,omitempty" property:"group"` |
| Version string `yaml:"version" json:"version,omitempty" property:"version"` |
| Serialization string `yaml:"serialization" json:"serialization" property:"serialization"` |
| ProvidedBy string `yaml:"provided_by" json:"provided_by,omitempty" property:"provided_by"` |
| Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"` |
| Async bool `yaml:"async" json:"async,omitempty" property:"async"` |
| Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` |
| invoker protocol.Invoker |
| urls []*common.URL |
| Generic string `yaml:"generic" json:"generic,omitempty" property:"generic"` |
| Sticky bool `yaml:"sticky" json:"sticky,omitempty" property:"sticky"` |
| RequestTimeout string `yaml:"timeout" json:"timeout,omitempty" property:"timeout"` |
| ForceTag bool `yaml:"force.tag" json:"force.tag,omitempty" property:"force.tag"` |
| TracingKey string `yaml:"tracing-key" json:"tracing-key,omitempty" propertiy:"tracing-key"` |
| //rootConfig *config.RootConfig |
| metaDataType string |
| MeshProviderPort int `yaml:"mesh-provider-port" json:"mesh-provider-port,omitempty" propertiy:"mesh-provider-port"` |
| |
| info *ClientInfo |
| |
| // configuration from ConsumerConfig |
| meshEnabled bool |
| adaptiveService bool |
| proxyFactory string |
| |
| application *commonCfg.ApplicationConfig |
| |
| registries map[string]*registry.RegistryConfig |
| } |
| |
| func (rc *ReferenceConfig) Prefix() string { |
| return constant.ReferenceConfigPrefix + rc.InterfaceName + "." |
| } |
| |
| func (rc *ReferenceConfig) Init(opts ...ReferenceOption) error { |
| for _, method := range rc.Methods { |
| if err := method.Init(); err != nil { |
| return err |
| } |
| } |
| if err := defaults.Set(rc); err != nil { |
| return err |
| } |
| for _, opt := range opts { |
| opt(rc) |
| } |
| if rc.application != nil { |
| rc.metaDataType = rc.application.MetadataType |
| if rc.Group == "" { |
| rc.Group = rc.application.Group |
| } |
| if rc.Version == "" { |
| rc.Version = rc.application.Version |
| } |
| } |
| if rc.Cluster == "" { |
| rc.Cluster = "failover" |
| } |
| // todo: move to registry package |
| rc.RegistryIDs = commonCfg.TranslateIds(rc.RegistryIDs) |
| |
| return commonCfg.Verify(rc) |
| } |
| |
| func getEnv(key, fallback string) string { |
| if value, ok := os.LookupEnv(key); ok { |
| return value |
| } |
| return fallback |
| } |
| |
| func updateOrCreateMeshURL(rc *ReferenceConfig) { |
| if rc.URL != "" { |
| logger.Infof("URL specified explicitly %v", rc.URL) |
| } |
| |
| if !rc.meshEnabled { |
| return |
| } |
| if rc.Protocol != constant2.TRIPLE { |
| panic(fmt.Sprintf("Mesh mode enabled, Triple protocol expected but %v protocol found!", rc.Protocol)) |
| } |
| if rc.ProvidedBy == "" { |
| panic("Mesh mode enabled, provided-by should not be empty!") |
| } |
| |
| podNamespace := getEnv(constant.PodNamespaceEnvKey, constant.DefaultNamespace) |
| clusterDomain := getEnv(constant.ClusterDomainKey, constant.DefaultClusterDomain) |
| |
| var meshPort int |
| if rc.MeshProviderPort > 0 { |
| meshPort = rc.MeshProviderPort |
| } else { |
| meshPort = constant.DefaultMeshPort |
| } |
| |
| rc.URL = "tri://" + rc.ProvidedBy + "." + podNamespace + constant.SVC + clusterDomain + ":" + strconv.Itoa(meshPort) |
| } |
| |
| // Refer retrieves invokers from urls. |
| func (rc *ReferenceConfig) Refer(srv interface{}) { |
| rc.refer(nil, srv) |
| } |
| |
| func (rc *ReferenceConfig) ReferWithInfo(info *ClientInfo) { |
| rc.refer(info, nil) |
| } |
| |
| func (rc *ReferenceConfig) refer(info *ClientInfo, srv interface{}) { |
| var methods []string |
| if info != nil { |
| rc.InterfaceName = info.InterfaceName |
| methods = info.MethodNames |
| rc.id = info.InterfaceName |
| rc.info = info |
| } else { |
| rc.id = common.GetReference(srv) |
| } |
| // If adaptive service is enabled, |
| // the cluster and load balance should be overridden to "adaptivesvc" and "p2c" respectively. |
| if rc.adaptiveService { |
| rc.Cluster = constant.ClusterKeyAdaptiveService |
| rc.Loadbalance = constant.LoadBalanceKeyP2C |
| } |
| |
| // cfgURL is an interface-level invoker url, in the other words, it represents an interface. |
| cfgURL := common.NewURLWithOptions( |
| common.WithPath(rc.InterfaceName), |
| common.WithProtocol(rc.Protocol), |
| common.WithMethods(methods), |
| common.WithParams(rc.getURLMap()), |
| common.WithParamsValue(constant.BeanNameKey, rc.id), |
| common.WithParamsValue(constant.MetadataTypeKey, rc.metaDataType), |
| ) |
| |
| if rc.ForceTag { |
| cfgURL.AddParam(constant.ForceUseTag, "true") |
| } |
| rc.postProcessConfig(cfgURL) |
| |
| // if mesh-enabled is set |
| updateOrCreateMeshURL(rc) |
| |
| // retrieving urls from config, and appending the urls to rc.urls |
| if rc.URL != "" { // use user-specific urls |
| /* |
| Two types of URL are allowed for rc.URL: |
| 1. direct url: server IP, that is, no need for a registry anymore |
| 2. registry url |
| They will be handled in different ways: |
| For example, we have a direct url and a registry url: |
| 1. "tri://localhost:10000" is a direct url |
| 2. "registry://localhost:2181" is a registry url. |
| Then, rc.URL looks like a string separated by semicolon: "tri://localhost:10000;registry://localhost:2181". |
| The result of urlStrings is a string array: []string{"tri://localhost:10000", "registry://localhost:2181"}. |
| */ |
| urlStrings := gxstrings.RegSplit(rc.URL, "\\s*[;]+\\s*") |
| for _, urlStr := range urlStrings { |
| serviceURL, err := common.NewURL(urlStr) |
| if err != nil { |
| panic(fmt.Sprintf("url configuration error, please check your configuration, user specified URL %v refer error, error message is %v ", urlStr, err.Error())) |
| } |
| if serviceURL.Protocol == constant.RegistryProtocol { // serviceURL in this branch is a registry protocol |
| serviceURL.SubURL = cfgURL |
| rc.urls = append(rc.urls, serviceURL) |
| } else { // serviceURL in this branch is the target endpoint IP address |
| if serviceURL.Path == "" { |
| serviceURL.Path = "/" + rc.InterfaceName |
| } |
| // replace params of serviceURL with params of cfgUrl |
| // other stuff, e.g. IP, port, etc., are same as serviceURL |
| newURL := common.MergeURL(serviceURL, cfgURL) |
| newURL.AddParam("peer", "true") |
| rc.urls = append(rc.urls, newURL) |
| } |
| } |
| } else { // use registry configs |
| rc.urls = registry.LoadRegistries(rc.RegistryIDs, rc.registries, common.CONSUMER) |
| // set url to regURLs |
| for _, regURL := range rc.urls { |
| regURL.SubURL = cfgURL |
| } |
| } |
| |
| // Get invokers according to rc.urls |
| var ( |
| invoker protocol.Invoker |
| regURL *common.URL |
| ) |
| invokers := make([]protocol.Invoker, len(rc.urls)) |
| for i, u := range rc.urls { |
| if u.Protocol == constant.ServiceRegistryProtocol { |
| invoker = extension.GetProtocol(constant.RegistryProtocol).Refer(u) |
| } else { |
| invoker = extension.GetProtocol(u.Protocol).Refer(u) |
| } |
| |
| if rc.URL != "" { |
| invoker = protocolwrapper.BuildInvokerChain(invoker, constant.ReferenceFilterKey) |
| } |
| |
| invokers[i] = invoker |
| if u.Protocol == constant.RegistryProtocol { |
| regURL = u |
| } |
| } |
| |
| // TODO(hxmhlt): decouple from directory, config should not depend on directory module |
| if len(invokers) == 1 { |
| rc.invoker = invokers[0] |
| if rc.URL != "" { |
| hitClu := constant.ClusterKeyFailover |
| if u := rc.invoker.GetURL(); u != nil { |
| hitClu = u.GetParam(constant.ClusterKey, constant.ClusterKeyZoneAware) |
| } |
| cluster, err := extension.GetCluster(hitClu) |
| if err != nil { |
| panic(err) |
| } else { |
| rc.invoker = cluster.Join(static.NewDirectory(invokers)) |
| } |
| } |
| } else { |
| var hitClu string |
| if regURL != nil { |
| // for multi-subscription scenario, use 'zone-aware' policy by default |
| hitClu = constant.ClusterKeyZoneAware |
| } else { |
| // not a registry url, must be direct invoke. |
| hitClu = constant.ClusterKeyFailover |
| if u := invokers[0].GetURL(); u != nil { |
| hitClu = u.GetParam(constant.ClusterKey, constant.ClusterKeyZoneAware) |
| } |
| } |
| cluster, err := extension.GetCluster(hitClu) |
| if err != nil { |
| panic(err) |
| } else { |
| rc.invoker = cluster.Join(static.NewDirectory(invokers)) |
| } |
| } |
| |
| // publish consumer's metadata |
| publishServiceDefinition(cfgURL) |
| // create proxy |
| if info == nil { |
| if rc.Async { |
| var callback common.CallbackResponse |
| if asyncSrv, ok := srv.(common.AsyncCallbackService); ok { |
| callback = asyncSrv.CallBack |
| } |
| rc.pxy = extension.GetProxyFactory(rc.proxyFactory).GetAsyncProxy(rc.invoker, callback, cfgURL) |
| } else { |
| rc.pxy = extension.GetProxyFactory(rc.proxyFactory).GetProxy(rc.invoker, cfgURL) |
| } |
| rc.pxy.Implement(srv) |
| } |
| } |
| |
| func (rc *ReferenceConfig) CheckAvailable() bool { |
| if rc.invoker == nil { |
| logger.Warnf("The interface %s invoker not exist, may you should check your interface config.", rc.InterfaceName) |
| return false |
| } |
| if !rc.invoker.IsAvailable() { |
| return false |
| } |
| return true |
| } |
| |
| // Implement |
| // @v is service provider implemented RPCService |
| func (rc *ReferenceConfig) Implement(v common.RPCService) { |
| if rc.pxy != nil { |
| rc.pxy.Implement(v) |
| } else if rc.info != nil { |
| rc.info.ClientInjectFunc(v, &Client{ |
| invoker: rc.invoker, |
| info: rc.info, |
| }) |
| } |
| } |
| |
| // GetRPCService gets RPCService from proxy |
| func (rc *ReferenceConfig) GetRPCService() common.RPCService { |
| return rc.pxy.Get() |
| } |
| |
| // GetProxy gets proxy |
| func (rc *ReferenceConfig) GetProxy() *proxy.Proxy { |
| return rc.pxy |
| } |
| |
| func (rc *ReferenceConfig) getURLMap() url.Values { |
| urlMap := url.Values{} |
| // first set user params |
| for k, v := range rc.Params { |
| urlMap.Set(k, v) |
| } |
| urlMap.Set(constant.InterfaceKey, rc.InterfaceName) |
| urlMap.Set(constant.TimestampKey, strconv.FormatInt(time.Now().Unix(), 10)) |
| urlMap.Set(constant.ClusterKey, rc.Cluster) |
| urlMap.Set(constant.LoadbalanceKey, rc.Loadbalance) |
| urlMap.Set(constant.RetriesKey, rc.Retries) |
| urlMap.Set(constant.GroupKey, rc.Group) |
| urlMap.Set(constant.VersionKey, rc.Version) |
| urlMap.Set(constant.GenericKey, rc.Generic) |
| urlMap.Set(constant.RegistryRoleKey, strconv.Itoa(common.CONSUMER)) |
| urlMap.Set(constant.ProvidedBy, rc.ProvidedBy) |
| urlMap.Set(constant.SerializationKey, rc.Serialization) |
| urlMap.Set(constant.TracingConfigKey, rc.TracingKey) |
| |
| urlMap.Set(constant.ReleaseKey, "dubbo-golang-"+constant.Version) |
| urlMap.Set(constant.SideKey, (common.RoleType(common.CONSUMER)).Role()) |
| |
| if len(rc.RequestTimeout) != 0 { |
| urlMap.Set(constant.TimeoutKey, rc.RequestTimeout) |
| } |
| // getty invoke async or sync |
| urlMap.Set(constant.AsyncKey, strconv.FormatBool(rc.Async)) |
| urlMap.Set(constant.StickyKey, strconv.FormatBool(rc.Sticky)) |
| |
| // applicationConfig info |
| if rc.application != nil { |
| urlMap.Set(constant.ApplicationKey, rc.application.Name) |
| urlMap.Set(constant.OrganizationKey, rc.application.Organization) |
| urlMap.Set(constant.NameKey, rc.application.Name) |
| urlMap.Set(constant.ModuleKey, rc.application.Module) |
| urlMap.Set(constant.AppVersionKey, rc.application.Version) |
| urlMap.Set(constant.OwnerKey, rc.application.Owner) |
| urlMap.Set(constant.EnvironmentKey, rc.application.Environment) |
| } |
| |
| // filter |
| defaultReferenceFilter := constant.DefaultReferenceFilters |
| if rc.Generic != "" { |
| defaultReferenceFilter = constant.GenericFilterKey + "," + defaultReferenceFilter |
| } |
| urlMap.Set(constant.ReferenceFilterKey, commonCfg.MergeValue(rc.Filter, "", defaultReferenceFilter)) |
| |
| for _, v := range rc.Methods { |
| urlMap.Set("methods."+v.Name+"."+constant.LoadbalanceKey, v.LoadBalance) |
| urlMap.Set("methods."+v.Name+"."+constant.RetriesKey, v.Retries) |
| urlMap.Set("methods."+v.Name+"."+constant.StickyKey, strconv.FormatBool(v.Sticky)) |
| if len(v.RequestTimeout) != 0 { |
| urlMap.Set("methods."+v.Name+"."+constant.TimeoutKey, v.RequestTimeout) |
| } |
| } |
| |
| return urlMap |
| } |
| |
| // todo: figure this out |
| //// GenericLoad ... |
| //func (rc *ReferenceConfig) GenericLoad(id string) { |
| // genericService := generic.NewGenericService(rc.id) |
| // config.SetConsumerService(genericService) |
| // rc.id = id |
| // rc.Refer(genericService) |
| // rc.Implement(genericService) |
| //} |
| |
| // GetInvoker get invoker from ReferenceConfigs |
| func (rc *ReferenceConfig) GetInvoker() protocol.Invoker { |
| return rc.invoker |
| } |
| |
| // postProcessConfig asks registered ConfigPostProcessor to post-process the current ReferenceConfigs. |
| func (rc *ReferenceConfig) postProcessConfig(url *common.URL) { |
| for _, p := range extension.GetConfigPostProcessors() { |
| p.PostProcessReferenceConfig(url) |
| } |
| } |
| |
| func publishServiceDefinition(url *common.URL) { |
| localService, err := extension.GetLocalMetadataService(constant.DefaultKey) |
| if err != nil { |
| logger.Warnf("get local metadata service failed, please check if you have imported _ \"dubbo.apache.org/dubbo-go/v3/metadata/service/local\"") |
| return |
| } |
| localService.PublishServiceDefinition(url) |
| if url.GetParam(constant.MetadataTypeKey, "") != constant.RemoteMetadataStorageType { |
| return |
| } |
| if remoteMetadataService, err := extension.GetRemoteMetadataService(); err == nil && remoteMetadataService != nil { |
| remoteMetadataService.PublishServiceDefinition(url) |
| } |
| } |
| |
| type ReferenceOption func(*ReferenceConfig) |