blob: 0998e3d5b09db7c004cf53321ec4ab2b7cee38ad [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package dubbo
import (
import (
_ ""
_ ""
_ ""
_ ""
_ ""
_ ""
_ ""
_ ""
dg ""
_ ""
_ ""
_ ""
_ ""
_ ""
hessian ""
fc ""
import (
cst ""
// TODO java class name elem
const (
JavaStringClassName = "java.lang.String"
JavaLangClassName = "java.lang.Long"
const (
defaultDubboProtocol = "zookeeper"
traceNameDubbogoClient = "dubbogo-client"
spanNameDubbogoClient = "DUBBOGO CLIENT"
spanTagMethod = "method"
spanTagType = "type"
spanTagValues = "values"
var (
dubboClient *Client
onceClient = sync.Once{}
defaultApplication = &dg.ApplicationConfig{
Organization: "dubbo-go-pixiu",
Name: "Dubbogo Pixiu",
Module: "dubbogo Pixiu",
Version: config.Version,
Owner: "Dubbogo Pixiu",
Environment: "dev",
// Client client to generic invoke dubbo
type Client struct {
lock sync.RWMutex
GenericServicePool map[string]*generic.GenericService
dubboProxyConfig *DubboProxyConfig
rootConfig *dg.RootConfig
// SingletonDubboClient singleton dubbo clent
func SingletonDubboClient() *Client {
if dubboClient == nil {
onceClient.Do(func() {
dubboClient = NewDubboClient()
return dubboClient
// InitDefaultDubboClient init default dubbo client
func InitDefaultDubboClient(dpc *DubboProxyConfig) {
dubboClient = NewDubboClient()
if err := dubboClient.Apply(); err != nil {
logger.Warnf("dubbo client apply error %s", err)
// NewDubboClient create dubbo client
func NewDubboClient() *Client {
return &Client{
lock: sync.RWMutex{},
GenericServicePool: make(map[string]*generic.GenericService, 4),
// SetConfig set config
func (dc *Client) SetConfig(dpc *DubboProxyConfig) {
dc.dubboProxyConfig = dpc
// Apply init dubbo, config mapping can do here
func (dc *Client) Apply() error {
rootConfigBuilder := dg.NewRootConfigBuilder()
if dc.dubboProxyConfig != nil && dc.dubboProxyConfig.Registries != nil {
for k, v := range dc.dubboProxyConfig.Registries {
if len(v.Protocol) == 0 {
logger.Warnf("can not find registry protocol config, use default type 'zookeeper'")
v.Protocol = defaultDubboProtocol
rootConfigBuilder.AddRegistry(k, &dg.RegistryConfig{
Protocol: v.Protocol,
Address: v.Address,
Timeout: v.Timeout,
Username: v.Username,
Password: v.Password,
Namespace: v.Namespace,
Group: v.Group,
rootConfig := rootConfigBuilder.Build()
if err := dg.Load(dg.WithRootConfig(rootConfig)); err != nil {
dc.rootConfig = rootConfig
return nil
// Close clear GenericServicePool.
func (dc *Client) Close() error {
defer dc.lock.Unlock()
for k := range dc.GenericServicePool {
delete(dc.GenericServicePool, k)
return nil
// Call invoke service
func (dc *Client) Call(req *client.Request) (res interface{}, err error) {
// if GET with no args, values would be nil
values, err := dc.genericArgs(req)
if err != nil {
return nil, err
target, ok := values.(*dubboTarget)
if !ok {
return nil, errors.New("map parameters failed")
dm := req.API.Method.IntegrationRequest
method := dm.Method
types := []string{}
vals := []hessian.Object{}
finalValues := []byte{}
if target != nil {
logger.Debugf("[dubbo-go-pixiu] dubbo invoke, method:%s, types:%s, reqData:%v", method, target.Types, target.Values)
types = target.Types
vals = make([]hessian.Object, len(target.Values))
for i, v := range target.Values {
vals[i] = v
var err error
finalValues, err = json.Marshal(vals)
if err != nil {
logger.Warnf("[dubbo-go-pixiu] reqData convert to string failed: %v", err)
} else {
logger.Debugf("[dubbo-go-pixiu] dubbo invoke, method:%s, types:%s, reqData:%v", method, nil, nil)
gs := dc.Get(dm)
tr := otel.Tracer(traceNameDubbogoClient)
ctx, span := tr.Start(req.Context, spanNameDubbogoClient)
defer span.End()
// tracing inject manually;
carrier := propagation.MapCarrier{}
otel.GetTextMapPropagator().Inject(ctx, carrier)
ctxWithAttachment := context.WithValue(ctx, constant.AttachmentKey, map[string]string(carrier))
rst, err := gs.Invoke(ctxWithAttachment, method, types, vals)
if err != nil {
// TODO statusCode I don’t know what dubbo will return when it times out, so I will return it directly. I will judge it when I call it.
return nil, err
logger.Debugf("[dubbo-go-pixiu] dubbo client resp:%v", rst)
return rst, nil
func (dc *Client) genericArgs(req *client.Request) (interface{}, error) {
values, err := dc.MapParams(req)
if err != nil {
return nil, err
return values, nil
// MapParams params mapping to api.
func (dc *Client) MapParams(req *client.Request) (interface{}, error) {
r := req.API.Method.IntegrationRequest
values := newDubboTarget(r.MappingParams)
if dc.dubboProxyConfig != nil && dc.dubboProxyConfig.IsDefaultMap {
values = newDubboTarget(defaultMappingParams)
for _, mappingParam := range r.MappingParams {
source, _, err := client.ParseMapSource(mappingParam.Name)
if err != nil {
return nil, err
if mapper, ok := mappers[source]; ok {
if err := mapper.Map(mappingParam, req, values, buildOption(mappingParam)); err != nil {
return nil, err
return values, nil
func buildOption(conf fc.MappingParam) client.RequestOption {
var opt client.RequestOption
isGeneric, mapToType := getGenericMapTo(conf.MapTo)
if isGeneric {
opt = DefaultMapOption[mapToType]
return opt
func (dc *Client) get(key string) *generic.GenericService {
defer dc.lock.RUnlock()
return dc.GenericServicePool[key]
func (dc *Client) check(key string) bool {
defer dc.lock.RUnlock()
if _, ok := dc.GenericServicePool[key]; ok {
return true
return false
// Get find a dubbo GenericService
func (dc *Client) Get(ir fc.IntegrationRequest) *generic.GenericService {
key := apiKey(&ir)
if dc.check(key) {
return dc.get(key)
return dc.create(key, ir)
func apiKey(ir *fc.IntegrationRequest) string {
dbc := ir.DubboBackendConfig
return strings.Join([]string{dbc.ClusterName, dbc.ApplicationName, dbc.Interface, dbc.Version, dbc.Group}, "_")
func (dc *Client) create(key string, irequest fc.IntegrationRequest) *generic.GenericService {
useNacosRegister := false
registerIds := make([]string, 0)
for k, v := range dc.rootConfig.Registries {
registerIds = append(registerIds, k)
if v.Protocol == "nacos" {
useNacosRegister = true
refConf := dg.ReferenceConfig{
InterfaceName: irequest.Interface,
Cluster: constant.ClusterKeyFailover,
RegistryIDs: registerIds,
Protocol: dubbo.DUBBO,
Generic: "true",
Version: irequest.DubboBackendConfig.Version,
Group: irequest.Group,
Loadbalance: dc.dubboProxyConfig.LoadBalance,
if len(irequest.DubboBackendConfig.Retries) == 0 {
refConf.Retries = "3"
} else {
refConf.Retries = irequest.DubboBackendConfig.Retries
if dc.dubboProxyConfig.Timeout != nil {
refConf.RequestTimeout = dc.dubboProxyConfig.Timeout.RequestTimeoutStr
} else {
refConf.RequestTimeout = cst.DefaultReqTimeout.String()
logger.Debugf("[dubbo-go-pixiu] client dubbo timeout val %v", refConf.RequestTimeout)
defer dc.lock.Unlock()
if service, ok := dc.GenericServicePool[key]; ok {
return service
if err := dg.Load(dg.WithRootConfig(dc.rootConfig)); err != nil {
_ = refConf.Init(dc.rootConfig)
// sleep when first call to fetch enough service meta data from nacos
// todo: GenericLoad should guarantee it
if useNacosRegister {
time.Sleep(1000 * time.Millisecond)
clientService := refConf.GetRPCService().(*generic.GenericService)
dc.GenericServicePool[key] = clientService
return clientService