blob: c3b6e7f4aa9deadaaa2989afee39473a295941c5 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package dubbo3
import (
import (
tripleConstant ""
triConfig ""
import (
var protocolOnce sync.Once
func init() {
// todo(DMwangnima): deprecated
//extension.SetProtocol(tripleConstant.TRIPLE, GetProtocol)
protocolOnce = sync.Once{}
var (
dubboProtocol *DubboProtocol
// DubboProtocol supports dubbo 3.0 protocol. It implements Protocol interface for dubbo protocol.
type DubboProtocol struct {
serverLock sync.Mutex
serviceMap *sync.Map // serviceMap is used to export multiple service by one server
serverMap map[string]*triple.TripleServer // serverMap stores all exported server
// NewDubboProtocol create a dubbo protocol.
func NewDubboProtocol() *DubboProtocol {
return &DubboProtocol{
BaseProtocol: protocol.NewBaseProtocol(),
serverMap: make(map[string]*triple.TripleServer),
serviceMap: &sync.Map{},
// Export export dubbo3 service.
func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
url := invoker.GetURL()
serviceKey := url.ServiceKey()
exporter := NewDubboExporter(serviceKey, invoker, dp.ExporterMap(), dp.serviceMap)
dp.SetExporterMap(serviceKey, exporter)
logger.Infof("[Triple Protocol] Export service: %s", url.String())
key := url.GetParam(constant.BeanNameKey, "")
var service interface{}
service = config.GetProviderService(key)
serializationType := url.GetParam(constant.SerializationKey, constant.ProtobufSerialization)
var triSerializationType tripleConstant.CodecType
if serializationType == constant.ProtobufSerialization {
m, ok := reflect.TypeOf(service).MethodByName("XXX_SetProxyImpl")
if !ok {
logger.Errorf("PB service with key = %s is not support XXX_SetProxyImpl to pb."+
"Please run go install to update your "+
"protoc-gen-go-triple and re-generate your pb file again.", key)
return nil
if invoker == nil {
panic(fmt.Sprintf("no invoker found for servicekey: %v", url.ServiceKey()))
in := []reflect.Value{reflect.ValueOf(service)}
in = append(in, reflect.ValueOf(invoker))
triSerializationType = tripleConstant.PBCodecName
} else {
valueOf := reflect.ValueOf(service)
typeOf := valueOf.Type()
numField := valueOf.NumMethod()
tripleService := &UnaryService{proxyImpl: invoker}
for i := 0; i < numField; i++ {
ft := typeOf.Method(i)
if ft.Name == "Reference" {
// get all method params type
typs := make([]reflect.Type, 0)
for j := 2; j < ft.Type.NumIn(); j++ {
typs = append(typs, ft.Type.In(j))
tripleService.setReqParamsTypes(ft.Name, typs)
service = tripleService
triSerializationType = tripleConstant.CodecType(serializationType)
dp.serviceMap.Store(url.GetParam(constant.InterfaceKey, ""), service)
// try start server
dp.openServer(url, triSerializationType)
return exporter
// Refer create dubbo3 service reference.
func (dp *DubboProtocol) Refer(url *common.URL) protocol.Invoker {
invoker, err := NewDubboInvoker(url)
if err != nil {
logger.Errorf("Refer url = %+v, with error = %s", url, err.Error())
return nil
logger.Infof("[Triple Protocol] Refer service: %s", url.String())
return invoker
// Destroy destroy dubbo3 service.
func (dp *DubboProtocol) Destroy() {
keyList := make([]string, 16)
defer dp.serverLock.Unlock()
// Stop all server
for k, _ := range dp.serverMap {
keyList = append(keyList, k)
for _, v := range keyList {
if server := dp.serverMap[v]; server != nil {
delete(dp.serverMap, v)
// Dubbo3GrpcService is gRPC service
type Dubbo3GrpcService interface {
// SetProxyImpl sets proxy.
XXX_SetProxyImpl(impl protocol.Invoker)
// GetProxyImpl gets proxy.
XXX_GetProxyImpl() protocol.Invoker
// ServiceDesc gets an RPC service's specification.
XXX_ServiceDesc() *grpc.ServiceDesc
type UnaryService struct {
proxyImpl protocol.Invoker
reqTypeMap sync.Map
func (d *UnaryService) setReqParamsTypes(methodName string, typ []reflect.Type) {
d.reqTypeMap.Store(methodName, typ)
func (d *UnaryService) GetReqParamsInterfaces(methodName string) ([]interface{}, bool) {
val, ok := d.reqTypeMap.Load(methodName)
if !ok {
return nil, false
typs := val.([]reflect.Type)
reqParamsInterfaces := make([]interface{}, 0, len(typs))
for _, typ := range typs {
reqParamsInterfaces = append(reqParamsInterfaces, reflect.New(typ).Interface())
return reqParamsInterfaces, true
func (d *UnaryService) InvokeWithArgs(ctx context.Context, methodName string, arguments []interface{}) (interface{}, error) {
dubboAttachment := make(map[string]interface{})
md, ok := metadata.FromIncomingContext(ctx)
if ok {
for k := range md {
dubboAttachment[k] = md.Get(k)[0]
res := d.proxyImpl.Invoke(ctx, invocation.NewRPCInvocation(methodName, arguments, dubboAttachment))
return res, res.Error()
// openServer open a dubbo3 server, if there is already a service using the same protocol, it returns directly.
func (dp *DubboProtocol) openServer(url *common.URL, tripleCodecType tripleConstant.CodecType) {
defer dp.serverLock.Unlock()
_, ok := dp.serverMap[url.Location]
if ok {
opts := []triConfig.OptionFunction{
tracingKey := url.GetParam(constant.TracingConfigKey, "")
if tracingKey != "" {
tracingConfig := config.GetTracingConfig(tracingKey)
if tracingConfig != nil {
if tracingConfig.ServiceName == "" {
tracingConfig.ServiceName = config.GetApplicationConfig().Name
switch tracingConfig.Name {
case "jaeger":
opts = append(opts, triConfig.WithJaegerConfig(
logger.Warnf("unsupported tracing name %s, now triple only support jaeger", tracingConfig.Name)
maxServerRecvMsgSize := constant.DefaultMaxServerRecvMsgSize
if recvMsgSize, err := humanize.ParseBytes(url.GetParam(constant.MaxServerRecvMsgSize, "")); err == nil && recvMsgSize != 0 {
maxServerRecvMsgSize = int(recvMsgSize)
maxServerSendMsgSize := constant.DefaultMaxServerSendMsgSize
if sendMsgSize, err := humanize.ParseBytes(url.GetParam(constant.MaxServerSendMsgSize, "")); err == nil && sendMsgSize != 0 {
maxServerSendMsgSize = int(sendMsgSize)
opts = append(opts, triConfig.WithGRPCMaxServerRecvMessageSize(maxServerRecvMsgSize))
opts = append(opts, triConfig.WithGRPCMaxServerSendMessageSize(maxServerSendMsgSize))
triOption := triConfig.NewTripleOption(opts...)
tlsConfig := config.GetRootConfig().TLSConfig
if tlsConfig != nil {
triOption.TLSCertFile = tlsConfig.TLSCertFile
triOption.TLSKeyFile = tlsConfig.TLSKeyFile
triOption.CACertFile = tlsConfig.CACertFile
triOption.TLSServerName = tlsConfig.TLSServerName
logger.Infof("Triple Server initialized the TLSConfig configuration")
_, ok = dp.ExporterMap().Load(url.ServiceKey())
if !ok {
panic("[DubboProtocol]" + url.Key() + "is not existing")
srv := triple.NewTripleServer(dp.serviceMap, triOption)
dp.serverMap[url.Location] = srv
// GetProtocol get a single dubbo3 protocol.
func GetProtocol() protocol.Protocol {
protocolOnce.Do(func() {
dubboProtocol = NewDubboProtocol()
return dubboProtocol