blob: 3536fc9d2dd079fa12669ceaf7a816cf5bbcd9c8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package client
import (
"strconv"
"time"
)
import (
"github.com/creasty/defaults"
)
import (
"dubbo.apache.org/dubbo-go/v3/common"
commonCfg "dubbo.apache.org/dubbo-go/v3/common/config"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/global"
"dubbo.apache.org/dubbo-go/v3/graceful_shutdown"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/proxy"
"dubbo.apache.org/dubbo-go/v3/registry"
)
type ReferenceOptions struct {
Reference *global.ReferenceConfig
Consumer *global.ConsumerConfig
Metrics *global.MetricsConfig
Otel *global.OtelConfig
pxy *proxy.Proxy
id string
invoker protocol.Invoker
urls []*common.URL
metaDataType string
info *ClientInfo
methodsCompat []*config.MethodConfig
applicationCompat *config.ApplicationConfig
registriesCompat map[string]*config.RegistryConfig
}
func defaultReferenceOptions() *ReferenceOptions {
return &ReferenceOptions{
Reference: global.DefaultReferenceConfig(),
Metrics: global.DefaultMetricsConfig(),
Otel: global.DefaultOtelConfig(),
}
}
func (refOpts *ReferenceOptions) init(opts ...ReferenceOption) error {
for _, opt := range opts {
opt(refOpts)
}
if err := defaults.Set(refOpts); err != nil {
return err
}
ref := refOpts.Reference
app := refOpts.applicationCompat
if app != nil {
refOpts.metaDataType = app.MetadataType
if ref.Group == "" {
ref.Group = app.Group
}
if ref.Version == "" {
ref.Version = app.Version
}
}
// init method
methods := ref.Methods
if length := len(methods); length > 0 {
refOpts.methodsCompat = make([]*config.MethodConfig, length)
for i, method := range methods {
refOpts.methodsCompat[i] = compatMethodConfig(method)
if err := refOpts.methodsCompat[i].Init(); err != nil {
return err
}
}
}
// init cluster
if ref.Cluster == "" {
ref.Cluster = "failover"
}
// init registries
if len(refOpts.registriesCompat) > 0 {
regs := refOpts.registriesCompat
if len(ref.RegistryIDs) <= 0 {
ref.RegistryIDs = make([]string, len(regs))
for key := range regs {
ref.RegistryIDs = append(ref.RegistryIDs, key)
}
}
ref.RegistryIDs = commonCfg.TranslateIds(ref.RegistryIDs)
newRegs := make(map[string]*config.RegistryConfig)
for _, id := range ref.RegistryIDs {
if reg, ok := regs[id]; ok {
newRegs[id] = reg
}
}
refOpts.registriesCompat = newRegs
}
// init protocol
if ref.Protocol == "" {
ref.Protocol = "tri"
if refOpts.Consumer != nil && refOpts.Consumer.Protocol != "" {
ref.Protocol = refOpts.Consumer.Protocol
}
}
// init serialization
if ref.Serialization == "" {
ref.Serialization = constant.ProtobufSerialization
}
return commonCfg.Verify(refOpts)
}
type ReferenceOption func(*ReferenceOptions)
// ---------- For user ----------
func WithCheck() ReferenceOption {
return func(opts *ReferenceOptions) {
check := true
opts.Reference.Check = &check
}
}
func WithURL(url string) ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.URL = url
}
}
func WithFilter(filter string) ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.Filter = filter
}
}
func WithRegistryIDs(registryIDs ...string) ReferenceOption {
return func(opts *ReferenceOptions) {
if len(registryIDs) > 0 {
opts.Reference.RegistryIDs = registryIDs
}
}
}
// ========== Cluster Strategy ==========
func WithClusterAvailable() ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.Cluster = constant.ClusterKeyAvailable
}
}
func WithClusterBroadcast() ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.Cluster = constant.ClusterKeyBroadcast
}
}
func WithClusterFailBack() ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.Cluster = constant.ClusterKeyFailback
}
}
func WithClusterFailFast() ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.Cluster = constant.ClusterKeyFailfast
}
}
func WithClusterFailOver() ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.Cluster = constant.ClusterKeyFailover
}
}
func WithClusterFailSafe() ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.Cluster = constant.ClusterKeyFailsafe
}
}
func WithClusterForking() ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.Cluster = constant.ClusterKeyForking
}
}
func WithClusterZoneAware() ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.Cluster = constant.ClusterKeyZoneAware
}
}
func WithClusterAdaptiveService() ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.Cluster = constant.ClusterKeyAdaptiveService
}
}
func WithCluster(cluster string) ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.Cluster = cluster
}
}
// ========== LoadBalance Strategy ==========
func WithLoadBalanceConsistentHashing() ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.Loadbalance = constant.LoadBalanceKeyConsistentHashing
}
}
func WithLoadBalanceLeastActive() ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.Loadbalance = constant.LoadBalanceKeyLeastActive
}
}
func WithLoadBalanceRandom() ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.Loadbalance = constant.LoadBalanceKeyRandom
}
}
func WithLoadBalanceRoundRobin() ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.Loadbalance = constant.LoadBalanceKeyRoundRobin
}
}
func WithLoadBalanceP2C() ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.Loadbalance = constant.LoadBalanceKeyP2C
}
}
func WithLoadBalance(lb string) ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.Loadbalance = lb
}
}
func WithRetries(retries int) ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.Retries = strconv.Itoa(retries)
}
}
func WithGroup(group string) ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.Group = group
}
}
func WithVersion(version string) ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.Version = version
}
}
func WithSerializationJSON() ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.Serialization = constant.JSONSerialization
}
}
func WithSerialization(serialization string) ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.Serialization = serialization
}
}
func WithProvidedBy(providedBy string) ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.ProvidedBy = providedBy
}
}
func WithAsync() ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.Async = true
}
}
func WithParams(params map[string]string) ReferenceOption {
return func(opts *ReferenceOptions) {
if len(params) <= 0 {
return
}
opts.Reference.Params = params
}
}
func WithGeneric() ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.Generic = "true"
}
}
func WithSticky() ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.Sticky = true
}
}
// ========== Protocol to consume ==========
func WithProtocolDubbo() ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.Protocol = constant.Dubbo
}
}
func WithProtocolTriple() ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.Protocol = "tri"
}
}
func WithProtocolJsonRPC() ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.Protocol = "jsonrpc"
}
}
func WithProtocol(protocol string) ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.Protocol = protocol
}
}
func WithRequestTimeout(timeout time.Duration) ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.RequestTimeout = timeout.String()
}
}
func WithForceTag() ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.ForceTag = true
}
}
func WithMeshProviderPort(port int) ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.MeshProviderPort = port
}
}
func WithMethod(opts ...config.MethodOption) ReferenceOption {
regOpts := config.NewMethodOptions(opts...)
return func(opts *ReferenceOptions) {
if len(opts.Reference.Methods) == 0 {
opts.Reference.Methods = make([]*global.MethodConfig, 0)
}
opts.Reference.Methods = append(opts.Reference.Methods, regOpts.Method)
}
}
func WithParam(k, v string) ReferenceOption {
return func(opts *ReferenceOptions) {
if opts.Reference.Params == nil {
opts.Reference.Params = make(map[string]string)
}
opts.Reference.Params[k] = v
}
}
// ---------- For framework ----------
// These functions should not be invoked by users
func setReference(reference *global.ReferenceConfig) ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference = reference
}
}
func setInterfaceName(interfaceName string) ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.InterfaceName = interfaceName
}
}
func setApplicationCompat(app *config.ApplicationConfig) ReferenceOption {
return func(opts *ReferenceOptions) {
opts.applicationCompat = app
}
}
func setRegistriesCompat(regs map[string]*config.RegistryConfig) ReferenceOption {
return func(opts *ReferenceOptions) {
opts.registriesCompat = regs
}
}
func setConsumer(consumer *global.ConsumerConfig) ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Consumer = consumer
}
}
func setMetrics(mc *global.MetricsConfig) ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Metrics = mc
}
}
func setOtel(oc *global.OtelConfig) ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Otel = oc
}
}
type ClientOptions struct {
Consumer *global.ConsumerConfig
Application *global.ApplicationConfig
Registries map[string]*global.RegistryConfig
Shutdown *global.ShutdownConfig
Metrics *global.MetricsConfig
Otel *global.OtelConfig
overallReference *global.ReferenceConfig
applicationCompat *config.ApplicationConfig
registriesCompat map[string]*config.RegistryConfig
}
func defaultClientOptions() *ClientOptions {
return &ClientOptions{
Consumer: global.DefaultConsumerConfig(),
Registries: make(map[string]*global.RegistryConfig),
Application: global.DefaultApplicationConfig(),
Shutdown: global.DefaultShutdownConfig(),
Metrics: global.DefaultMetricsConfig(),
Otel: global.DefaultOtelConfig(),
overallReference: global.DefaultReferenceConfig(),
}
}
func (cliOpts *ClientOptions) init(opts ...ClientOption) error {
for _, opt := range opts {
opt(cliOpts)
}
if err := defaults.Set(cliOpts); err != nil {
return err
}
con := cliOpts.Consumer
// init application
application := cliOpts.Application
if application != nil {
cliOpts.applicationCompat = compatApplicationConfig(application)
if err := cliOpts.applicationCompat.Init(); err != nil {
return err
}
}
// init registries
regs := cliOpts.Registries
if regs != nil {
cliOpts.registriesCompat = make(map[string]*config.RegistryConfig)
if len(con.RegistryIDs) <= 0 {
con.RegistryIDs = make([]string, len(regs))
for key := range regs {
con.RegistryIDs = append(con.RegistryIDs, key)
}
}
con.RegistryIDs = commonCfg.TranslateIds(con.RegistryIDs)
for _, id := range con.RegistryIDs {
if reg, ok := regs[id]; ok {
cliOpts.registriesCompat[id] = compatRegistryConfig(reg)
if err := cliOpts.registriesCompat[id].Init(); err != nil {
return err
}
}
}
}
// init cluster
if cliOpts.overallReference.Cluster == "" {
cliOpts.overallReference.Cluster = constant.ClusterKeyFailover
}
// init protocol
if cliOpts.Consumer.Protocol == "" {
cliOpts.Consumer.Protocol = "tri"
}
// init serialization
if cliOpts.overallReference.Serialization == "" {
cliOpts.overallReference.Serialization = constant.ProtobufSerialization
}
// todo(DMwangnima): is there any part that we should do compatibility processing?
// init graceful_shutdown
graceful_shutdown.Init(graceful_shutdown.SetShutdown_Config(cliOpts.Shutdown))
return nil
}
type ClientOption func(*ClientOptions)
func WithClientCheck() ClientOption {
return func(opts *ClientOptions) {
opts.Consumer.Check = true
}
}
func WithClientURL(url string) ClientOption {
return func(opts *ClientOptions) {
opts.overallReference.URL = url
}
}
// todo(DMwangnima): change Filter Option like Cluster and LoadBalance
func WithClientFilter(filter string) ClientOption {
return func(opts *ClientOptions) {
// todo: move this to overallReference
opts.Consumer.Filter = filter
}
}
// todo(DMwangnima): think about a more ideal configuration style
func WithClientRegistryIDs(registryIDs ...string) ClientOption {
return func(opts *ClientOptions) {
if len(registryIDs) > 0 {
opts.Consumer.RegistryIDs = registryIDs
}
}
}
func WithClientRegistry(opts ...registry.Option) ClientOption {
regOpts := registry.NewOptions(opts...)
return func(cliOpts *ClientOptions) {
cliOpts.Registries[regOpts.ID] = regOpts.Registry
}
}
func WithClientShutdown(opts ...graceful_shutdown.Option) ClientOption {
sdOpts := graceful_shutdown.NewOptions(opts...)
return func(cliOpts *ClientOptions) {
cliOpts.Shutdown = sdOpts.Shutdown
}
}
// ========== Cluster Strategy ==========
func WithClientClusterAvailable() ClientOption {
return func(opts *ClientOptions) {
opts.overallReference.Cluster = constant.ClusterKeyAvailable
}
}
func WithClientClusterBroadcast() ClientOption {
return func(opts *ClientOptions) {
opts.overallReference.Cluster = constant.ClusterKeyBroadcast
}
}
func WithClientClusterFailBack() ClientOption {
return func(opts *ClientOptions) {
opts.overallReference.Cluster = constant.ClusterKeyFailback
}
}
func WithClientClusterFailFast() ClientOption {
return func(opts *ClientOptions) {
opts.overallReference.Cluster = constant.ClusterKeyFailfast
}
}
func WithClientClusterFailOver() ClientOption {
return func(opts *ClientOptions) {
opts.overallReference.Cluster = constant.ClusterKeyFailover
}
}
func WithClientClusterFailSafe() ClientOption {
return func(opts *ClientOptions) {
opts.overallReference.Cluster = constant.ClusterKeyFailsafe
}
}
func WithClientClusterForking() ClientOption {
return func(opts *ClientOptions) {
opts.overallReference.Cluster = constant.ClusterKeyForking
}
}
func WithClientClusterZoneAware() ClientOption {
return func(opts *ClientOptions) {
opts.overallReference.Cluster = constant.ClusterKeyZoneAware
}
}
func WithClientClusterAdaptiveService() ClientOption {
return func(opts *ClientOptions) {
opts.overallReference.Cluster = constant.ClusterKeyAdaptiveService
}
}
func WithClientClusterStrategy(strategy string) ClientOption {
return func(opts *ClientOptions) {
opts.overallReference.Cluster = strategy
}
}
// ========== LoadBalance Strategy ==========
func WithClientLoadBalanceConsistentHashing() ClientOption {
return func(opts *ClientOptions) {
opts.overallReference.Loadbalance = constant.LoadBalanceKeyConsistentHashing
}
}
func WithClientLoadBalanceLeastActive() ClientOption {
return func(opts *ClientOptions) {
opts.overallReference.Loadbalance = constant.LoadBalanceKeyLeastActive
}
}
func WithClientLoadBalanceRandom() ClientOption {
return func(opts *ClientOptions) {
opts.overallReference.Loadbalance = constant.LoadBalanceKeyRandom
}
}
func WithClientLoadBalanceRoundRobin() ClientOption {
return func(opts *ClientOptions) {
opts.overallReference.Loadbalance = constant.LoadBalanceKeyRoundRobin
}
}
func WithClientLoadBalanceP2C() ClientOption {
return func(opts *ClientOptions) {
opts.overallReference.Loadbalance = constant.LoadBalanceKeyP2C
}
}
func WithClientLoadBalance(lb string) ClientOption {
return func(opts *ClientOptions) {
opts.overallReference.Loadbalance = lb
}
}
func WithClientRetries(retries int) ClientOption {
return func(opts *ClientOptions) {
opts.overallReference.Retries = strconv.Itoa(retries)
}
}
// is this needed?
func WithClientGroup(group string) ClientOption {
return func(opts *ClientOptions) {
opts.overallReference.Group = group
}
}
// is this needed?
func WithClientVersion(version string) ClientOption {
return func(opts *ClientOptions) {
opts.overallReference.Version = version
}
}
func WithClientSerializationJSON() ClientOption {
return func(opts *ClientOptions) {
opts.overallReference.Serialization = constant.JSONSerialization
}
}
func WithClientSerialization(ser string) ClientOption {
return func(opts *ClientOptions) {
opts.overallReference.Serialization = ser
}
}
func WithClientProvidedBy(providedBy string) ClientOption {
return func(opts *ClientOptions) {
opts.overallReference.ProvidedBy = providedBy
}
}
// todo(DMwangnima): implement this functionality
// func WithAsync() ClientOption {
// return func(opts *ClientOptions) {
// opts.Consumer.Async = true
// }
// }
func WithClientParams(params map[string]string) ClientOption {
return func(opts *ClientOptions) {
if len(params) <= 0 {
return
}
opts.overallReference.Params = params
}
}
func WithClientParam(k, v string) ClientOption {
return func(opts *ClientOptions) {
if opts.overallReference.Params == nil {
opts.overallReference.Params = make(map[string]string, 8)
}
opts.overallReference.Params[k] = v
}
}
// todo(DMwangnima): implement this functionality
// func WithClientGeneric(generic bool) ClientOption {
// return func(opts *ClientOptions) {
// if generic {
// opts.Consumer.Generic = "true"
// } else {
// opts.Consumer.Generic = "false"
// }
// }
// }
func WithClientSticky() ClientOption {
return func(opts *ClientOptions) {
opts.overallReference.Sticky = true
}
}
// ========== Protocol to consume ==========
func WithClientProtocolDubbo() ClientOption {
return func(opts *ClientOptions) {
opts.Consumer.Protocol = constant.Dubbo
}
}
func WithClientProtocolTriple() ClientOption {
return func(opts *ClientOptions) {
opts.Consumer.Protocol = "tri"
}
}
func WithClientProtocolJsonRPC() ClientOption {
return func(opts *ClientOptions) {
opts.Consumer.Protocol = "jsonrpc"
}
}
func WithClientProtocol(protocol string) ClientOption {
return func(opts *ClientOptions) {
opts.Consumer.Protocol = protocol
}
}
func WithClientRequestTimeout(timeout time.Duration) ClientOption {
return func(opts *ClientOptions) {
opts.Consumer.RequestTimeout = timeout.String()
}
}
func WithClientForceTag() ClientOption {
return func(opts *ClientOptions) {
opts.overallReference.ForceTag = true
}
}
func WithClientMeshProviderPort(port int) ClientOption {
return func(opts *ClientOptions) {
opts.overallReference.MeshProviderPort = port
}
}
func SetClientRegistries(regs map[string]*global.RegistryConfig) ClientOption {
return func(opts *ClientOptions) {
opts.Registries = regs
}
}
func SetApplication(application *global.ApplicationConfig) ClientOption {
return func(opts *ClientOptions) {
opts.Application = application
}
}
func SetClientConsumer(consumer *global.ConsumerConfig) ClientOption {
return func(opts *ClientOptions) {
opts.Consumer = consumer
}
}
func SetClientShutdown(shutdown *global.ShutdownConfig) ClientOption {
return func(opts *ClientOptions) {
opts.Shutdown = shutdown
}
}
func SetClientMetrics(metrics *global.MetricsConfig) ClientOption {
return func(opts *ClientOptions) {
opts.Metrics = metrics
}
}
func SetClientOtel(otel *global.OtelConfig) ClientOption {
return func(opts *ClientOptions) {
opts.Otel = otel
}
}
// todo: need to be consistent with MethodConfig
type CallOptions struct {
RequestTimeout string
Retries string
}
type CallOption func(*CallOptions)
func newDefaultCallOptions() *CallOptions {
return &CallOptions{}
}
// WithCallRequestTimeout the maximum waiting time for one specific call, only works for 'tri' and 'dubbo' protocol
func WithCallRequestTimeout(timeout time.Duration) CallOption {
return func(opts *CallOptions) {
opts.RequestTimeout = timeout.String()
}
}
// WithCallRetries the maximum retry times on request failure for one specific call, only works for 'tri' and 'dubbo' protocol
func WithCallRetries(retries int) CallOption {
return func(opts *CallOptions) {
opts.Retries = strconv.Itoa(retries)
}
}