implement triple protocol server-side adaptation (#2378)
diff --git a/client/reference_config.go b/client/reference_config.go
index f425706..ca75dd7 100644
--- a/client/reference_config.go
+++ b/client/reference_config.go
@@ -40,7 +40,6 @@
commonCfg "dubbo.apache.org/dubbo-go/v3/common/config"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
- "dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/protocolwrapper"
"dubbo.apache.org/dubbo-go/v3/proxy"
@@ -195,9 +194,6 @@
common.WithParamsValue(constant.MetadataTypeKey, rc.metaDataType),
)
- if info == nil {
- config.SetConsumerServiceByInterfaceName(rc.InterfaceName, srv)
- }
if rc.ForceTag {
cfgURL.AddParam(constant.ForceUseTag, "true")
}
@@ -310,7 +306,10 @@
// create proxy
if info == nil {
if rc.Async {
- callback := config.GetCallback(rc.id)
+ var callback common.CallbackResponse
+ if asyncSrv, ok := srv.(common.AsyncCallbackService); ok {
+ callback = asyncSrv.CallBack
+ }
rc.pxy = extension.GetProxyFactory(rc.proxyFactory).GetAsyncProxy(rc.invoker, callback, cfgURL)
} else {
rc.pxy = extension.GetProxyFactory(rc.proxyFactory).GetProxy(rc.invoker, cfgURL)
diff --git a/common/config/utils.go b/common/config/utils.go
index ffe9000..76872fb 100644
--- a/common/config/utils.go
+++ b/common/config/utils.go
@@ -14,7 +14,6 @@
import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
- "dubbo.apache.org/dubbo-go/v3/common/extension"
)
var validate *validator.Validate
@@ -95,11 +94,6 @@
return normalStr
}
-// ClientNameID unique identifier id for client
-func ClientNameID(config extension.Config, protocol, address string) string {
- return strings.Join([]string{config.Prefix(), protocol, address}, "-")
-}
-
func IsValid(addr string) bool {
return addr != "" && addr != constant.NotAvailable
}
diff --git a/config/consumer_config.go b/config/consumer_config.go
index 89170a3..fbcf5f3 100644
--- a/config/consumer_config.go
+++ b/config/consumer_config.go
@@ -162,6 +162,7 @@
}
refConfig.Refer(refRPCService)
refConfig.Implement(refRPCService)
+ SetConsumerServiceByInterfaceName(refConfig.InterfaceName, refRPCService)
}
for info, refRPCService := range GetClientInfoServicesMap() {
diff --git a/protocol/triple/internal/proto/greettriple/greet.triple.go b/protocol/triple/internal/proto/greettriple/greet.triple.go
index 841223a..eca9e9a 100644
--- a/protocol/triple/internal/proto/greettriple/greet.triple.go
+++ b/protocol/triple/internal/proto/greettriple/greet.triple.go
@@ -17,7 +17,7 @@
"dubbo.apache.org/dubbo-go/v3/config"
proto "dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/proto"
triple_protocol "dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol"
- "dubbo.apache.org/dubbo-go/v3/provider"
+ "dubbo.apache.org/dubbo-go/v3/server"
)
// This is a compile-time assertion to ensure that this generated file and the connect package are
@@ -226,8 +226,8 @@
GreetServerStream(context.Context, *proto.GreetServerStreamRequest, GreetService_GreetServerStreamServer) error
}
-func ProvideGreetServiceHandler(pro *provider.Provider, hdlr GreetServiceHandler) error {
- return pro.Provide(hdlr, &GreetService_ServiceInfo)
+func RegisterGreetServiceHandler(srv *server.Server, hdlr GreetServiceHandler) error {
+ return srv.Register(hdlr, &GreetService_ServiceInfo)
}
type GreetService_GreetStreamServer interface {
@@ -299,10 +299,10 @@
return g.ServerStream.Send(msg)
}
-var GreetService_ServiceInfo = provider.ServiceInfo{
+var GreetService_ServiceInfo = server.ServiceInfo{
InterfaceName: "greet.GreetService",
ServiceType: (*GreetServiceHandler)(nil),
- Methods: []provider.MethodInfo{
+ Methods: []server.MethodInfo{
{
Name: "Greet",
Type: constant.CallUnary,
diff --git a/protocol/triple/internal/server/cmd/main.go b/protocol/triple/internal/server/cmd/main.go
index b38a896..f95118f 100644
--- a/protocol/triple/internal/server/cmd/main.go
+++ b/protocol/triple/internal/server/cmd/main.go
@@ -16,7 +16,7 @@
_ "dubbo.apache.org/dubbo-go/v3/imports"
greet "dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/proto"
"dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/proto/greettriple"
- "dubbo.apache.org/dubbo-go/v3/provider"
+ "dubbo.apache.org/dubbo-go/v3/server"
)
type GreetConnectServer struct {
@@ -64,11 +64,11 @@
}
func main() {
- pro, err := provider.NewProvider()
+ srv, err := server.NewServer()
if err != nil {
panic(err)
}
- if err := greettriple.ProvideGreetServiceHandler(pro, &GreetConnectServer{}); err != nil {
+ if err := greettriple.RegisterGreetServiceHandler(srv, &GreetConnectServer{}); err != nil {
panic(err)
}
select {}
diff --git a/protocol/triple/server.go b/protocol/triple/server.go
index 3ac421c..9b22961 100644
--- a/protocol/triple/server.go
+++ b/protocol/triple/server.go
@@ -20,7 +20,6 @@
import (
"context"
"crypto/tls"
- "fmt"
"net/http"
"path"
"sync"
@@ -42,20 +41,10 @@
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/invocation"
- proto "dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/proto"
tri "dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol"
+ "dubbo.apache.org/dubbo-go/v3/server"
)
-//// TripleService is implemented by user logic struct wrapping ProviderBase generated by protoc-gen-triple.
-//type TripleService interface {
-// // SetProxyImpl sets proxy. All invocation logics are embedded in impl.
-// SetProxyImpl(impl protocol.Invoker)
-// // GetProxyImpl gets proxy.
-// GetProxyImpl() protocol.Invoker
-// // BuildHandler receives user logic struct and handlerOption defined by triple to create handler.
-// // For now, impl is just used to finish type checking.
-// BuildHandler(impl interface{}, opts ...tri.HandlerOption) (string, http.Handler)
-//}
// Server is TRIPLE server
type Server struct {
httpServer *http.Server
@@ -67,12 +56,14 @@
}
// Start TRIPLE server
-func (s *Server) Start(url *common.URL) {
+func (s *Server) Start(invoker protocol.Invoker, info *server.ServiceInfo) {
var (
addr string
err error
+ url *common.URL
hanOpts []tri.HandlerOption
)
+ url = invoker.GetURL()
addr = url.Location
srv := &http.Server{
Addr: addr,
@@ -90,6 +81,7 @@
}
hanOpts = append(hanOpts, tri.WithSendMaxBytes(maxServerSendMsgSize))
+ // todo: implement interceptor
// If global trace instance was set, then server tracer instance
// can be get. If not, will return NoopTracer.
//tracer := opentracing.GlobalTracer()
@@ -111,7 +103,7 @@
if err != nil {
return
}
- logger.Infof("Grpc Server initialized the TLSConfig configuration")
+ logger.Infof("Triple Server initialized the TLSConfig configuration")
}
srv.TLSConfig = cfg
@@ -121,16 +113,18 @@
s.httpServer = srv
go func() {
- providerServices := config.GetProviderConfig().Services
-
- if len(providerServices) == 0 {
- panic("provider service map is null")
- }
+ //providerServices := config.GetProviderConfig().Services
+ //
+ //if len(providerServices) == 0 {
+ // panic("provider service map is null")
+ //}
// todo: remove this logic?
// wait all exporter ready , then set proxy impl and grpc registerService
- waitTripleExporter(providerServices)
+ //waitTripleExporter(providerServices)
mux := http.NewServeMux()
- handleService(providerServices, mux)
+ if info != nil {
+ handleServiceWithInfo(invoker, info, mux)
+ }
// todo: figure it out this process
//reflection.Register(server)
// todo: without tls
@@ -173,68 +167,94 @@
return
}
case <-ta.C:
- panic("wait GRPC_NEW exporter timeout when start GRPC_NEW server")
+ panic("wait Triple exporter timeout when start GRPC_NEW server")
}
}
}
-// handleService injects invoker and creates handler based on ServiceConfig and provider service.
-func handleService(providerServices map[string]*config.ServiceConfig, mux *http.ServeMux, opts ...tri.HandlerOption) {
- for key, providerService := range providerServices {
- service := config.GetProviderService(key)
- ds, ok := service.(TripleService)
- if !ok {
- panic("illegal service type registered")
- }
+//// handleService injects invoker and creates handler based on ServiceConfig and provider service.
+//func handleService(providerServices map[string]*config.ServiceConfig, mux *http.ServeMux, opts ...tri.HandlerOption) {
+// for key, providerService := range providerServices {
+// service := config.GetProviderService(key)
+// ds, ok := service.(TripleService)
+// if !ok {
+// panic("illegal service type registered")
+// }
+//
+// serviceKey := common.ServiceKey(providerService.Interface, providerService.Group, providerService.Version)
+// exporter, _ := tripleProtocol.ExporterMap().Load(serviceKey)
+// if exporter == nil {
+// panic(fmt.Sprintf("no exporter found for servicekey: %v", serviceKey))
+// }
+// invoker := exporter.(protocol.Exporter).GetInvoker()
+// if invoker == nil {
+// panic(fmt.Sprintf("no invoker found for servicekey: %v", serviceKey))
+// }
+//
+// // inject invoker, it has all invocation logics
+// ds.SetProxyImpl(invoker)
+// path, handler := ds.BuildHandler(service, opts...)
+// mux.Handle(path, tri.New)
+// mux.Handle(path, handler)
+// }
+//}
- serviceKey := common.ServiceKey(providerService.Interface, providerService.Group, providerService.Version)
- exporter, _ := tripleProtocol.ExporterMap().Load(serviceKey)
- if exporter == nil {
- panic(fmt.Sprintf("no exporter found for servicekey: %v", serviceKey))
- }
- invoker := exporter.(protocol.Exporter).GetInvoker()
- if invoker == nil {
- panic(fmt.Sprintf("no invoker found for servicekey: %v", serviceKey))
- }
-
- // inject invoker, it has all invocation logics
- ds.SetProxyImpl(invoker)
- path, handler := ds.BuildHandler(service, opts...)
- mux.Handle(path, tri.New)
- mux.Handle(path, handler)
- }
-}
-
-func handleService2(url *common.URL, mux *http.ServeMux, opts ...tri.HandlerOption) {
- interfaceName := ""
- methodNames := []string{}
- types := []string{}
- serviceKey := url.ServiceKey()
- exporter, ok := tripleProtocol.ExporterMap().Load(serviceKey)
- if !ok {
- panic(fmt.Sprintf("no exporter found for servicekey: %v", serviceKey))
- }
- for i, method := range methodNames {
+// handleServiceWithInfo injects invoker and create handler based on ServiceInfo
+func handleServiceWithInfo(invoker protocol.Invoker, info *server.ServiceInfo, mux *http.ServeMux, opts ...tri.HandlerOption) {
+ for _, method := range info.Methods {
var handler http.Handler
- procedure := path.Join(interfaceName, method)
- typ := types[i]
- switch typ {
+ procedure := path.Join(info.InterfaceName, method.Name)
+ switch method.Type {
case constant.CallUnary:
handler = tri.NewUnaryHandler(
procedure,
+ method.ReqInitFunc,
func(ctx context.Context, req *tri.Request) (*tri.Response, error) {
var args []interface{}
- args = append(args, req.)
- // provider.Serve()
- invo := invocation.NewRPCInvocation("Greet", args, nil)
- res := s.proxyImpl.Invoke(ctx, invo)
- return res.Result().(*triple_protocol.Response[proto.GreetResponse]), res.Error()
+ args = append(args, req.Msg)
+ // todo: inject method.Meta to attachments
+ invo := invocation.NewRPCInvocation(method.Name, args, nil)
+ res := invoker.Invoke(ctx, invo)
+ return res.Result().(*tri.Response), res.Error()
},
opts...,
)
-
+ case constant.CallClientStream:
+ handler = tri.NewClientStreamHandler(
+ procedure,
+ func(ctx context.Context, stream *tri.ClientStream) (*tri.Response, error) {
+ var args []interface{}
+ args = append(args, method.StreamInitFunc(stream))
+ invo := invocation.NewRPCInvocation(method.Name, args, nil)
+ res := invoker.Invoke(ctx, invo)
+ return res.Result().(*tri.Response), res.Error()
+ },
+ )
+ case constant.CallServerStream:
+ handler = tri.NewServerStreamHandler(
+ procedure,
+ method.ReqInitFunc,
+ func(ctx context.Context, request *tri.Request, stream *tri.ServerStream) error {
+ var args []interface{}
+ args = append(args, request.Msg, method.StreamInitFunc(stream))
+ invo := invocation.NewRPCInvocation(method.Name, args, nil)
+ res := invoker.Invoke(ctx, invo)
+ return res.Error()
+ },
+ )
+ case constant.CallBidiStream:
+ handler = tri.NewBidiStreamHandler(
+ procedure,
+ func(ctx context.Context, stream *tri.BidiStream) error {
+ var args []interface{}
+ args = append(args, method.StreamInitFunc(stream))
+ invo := invocation.NewRPCInvocation(method.Name, args, nil)
+ res := invoker.Invoke(ctx, invo)
+ return res.Error()
+ },
+ )
}
-
+ mux.Handle(procedure, handler)
}
}
diff --git a/protocol/triple/triple.go b/protocol/triple/triple.go
index b429b91..8f7c1a7 100644
--- a/protocol/triple/triple.go
+++ b/protocol/triple/triple.go
@@ -29,6 +29,7 @@
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/protocol"
+ "dubbo.apache.org/dubbo-go/v3/server"
)
const (
@@ -54,14 +55,17 @@
func (tp *TripleProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
url := invoker.GetURL()
serviceKey := url.ServiceKey()
+ // todo: retrieve this info from url
+ info := &server.ServiceInfo{}
exporter := NewTripleExporter(serviceKey, invoker, tp.ExporterMap())
tp.SetExporterMap(serviceKey, exporter)
logger.Infof("[TRIPLE Protocol] Export service: %s", url.String())
- tp.openServer(url)
+ tp.openServer(invoker, info)
return exporter
}
-func (tp *TripleProtocol) openServer(url *common.URL) {
+func (tp *TripleProtocol) openServer(invoker protocol.Invoker, info *server.ServiceInfo) {
+ url := invoker.GetURL()
tp.serverLock.Lock()
defer tp.serverLock.Unlock()
@@ -69,23 +73,17 @@
return
}
- // todo: remove this logic?
if _, ok := tp.ExporterMap().Load(url.ServiceKey()); !ok {
- panic("[GRPC_NEW Protocol]" + url.Key() + "is not existing")
+ panic("[TRIPLE Protocol]" + url.Key() + "is not existing")
}
srv := NewServer()
tp.serverMap[url.Location] = srv
- srv.Start(url)
+ srv.Start(invoker, info)
}
// Refer a remote triple service
func (tp *TripleProtocol) Refer(url *common.URL) protocol.Invoker {
- //client, err := NewClient(url)
- //if err != nil {
- // logger.Warnf("can't dial the server: %s", url.Key())
- // return nil
- //}
invoker, err := NewTripleInvoker(url)
if err != nil {
logger.Warnf("can't dial the server: %s", url.Key())
@@ -97,7 +95,7 @@
}
func (tp *TripleProtocol) Destroy() {
- logger.Infof("GrpcProtocol destroy.")
+ logger.Infof("TripleProtocol destroy.")
tp.serverLock.Lock()
defer tp.serverLock.Unlock()
diff --git a/protocol/triple/triple_protocol/handler.go b/protocol/triple/triple_protocol/handler.go
index 25f4950..2593389 100644
--- a/protocol/triple/triple_protocol/handler.go
+++ b/protocol/triple/triple_protocol/handler.go
@@ -137,14 +137,14 @@
procedure,
StreamTypeServer,
func(ctx context.Context, conn StreamingHandlerConn) error {
- msg := reqInitFunc()
- if err := conn.Receive(&msg); err != nil {
+ req := reqInitFunc()
+ if err := conn.Receive(&req); err != nil {
return err
}
return implementation(
ctx,
&Request{
- Msg: &msg,
+ Msg: req,
spec: conn.Spec(),
peer: conn.Peer(),
header: conn.RequestHeader(),
diff --git a/registry/registry_config.go b/registry/registry_config.go
index 774b816..7ecac63 100644
--- a/registry/registry_config.go
+++ b/registry/registry_config.go
@@ -83,7 +83,7 @@
urlMap.Set(constant.RegistryKey+"."+constant.RegistryZoneKey, c.Zone)
urlMap.Set(constant.RegistryKey+"."+constant.WeightKey, strconv.FormatInt(c.Weight, 10))
urlMap.Set(constant.RegistryTTLKey, c.TTL)
- urlMap.Set(constant.ClientNameKey, commonCfg.ClientNameID(c, c.Protocol, c.Address))
+ urlMap.Set(constant.ClientNameKey, ClientNameID(c, c.Protocol, c.Address))
for k, v := range c.Params {
urlMap.Set(k, v)
@@ -111,7 +111,7 @@
common.WithUsername(c.Username),
common.WithPassword(c.Password),
common.WithParamsValue(constant.TimeoutKey, c.Timeout),
- common.WithParamsValue(constant.ClientNameKey, commonCfg.ClientNameID(c, c.Protocol, c.Address)),
+ common.WithParamsValue(constant.ClientNameKey, ClientNameID(c, c.Protocol, c.Address)),
common.WithParamsValue(constant.MetadataReportGroupKey, c.Group),
common.WithParamsValue(constant.MetadataReportNamespaceKey, c.Namespace),
)
@@ -245,6 +245,11 @@
return registryURLs
}
+// ClientNameID unique identifier id for client
+func ClientNameID(config *RegistryConfig, protocol, address string) string {
+ return strings.Join([]string{config.Prefix(), protocol, address}, "-")
+}
+
func (c *RegistryConfig) createNewURL(protocol string, address string, roleType common.RoleType) (*common.URL, error) {
return common.NewURL(protocol+"://"+address,
common.WithParams(c.getUrlMap(roleType)),
diff --git a/provider/options.go b/server/options.go
similarity index 75%
rename from provider/options.go
rename to server/options.go
index f762c80..01c35d5 100644
--- a/provider/options.go
+++ b/server/options.go
@@ -1,4 +1,4 @@
-package provider
+package server
type Options struct {
}
diff --git a/provider/provider.go b/server/server.go
similarity index 69%
rename from provider/provider.go
rename to server/server.go
index 7252417..b860be8 100644
--- a/provider/provider.go
+++ b/server/server.go
@@ -1,14 +1,14 @@
-package provider
+package server
import (
"context"
)
-type Provider struct {
+type Server struct {
}
-// Provide assemble invoker chains like ProviderConfig.Load, init a service per call
-func (pro *Provider) Provide(handler interface{}, info *ServiceInfo, opts ...Option) error {
+// Register assemble invoker chains like ProviderConfig.Load, init a service per call
+func (pro *Server) Register(handler interface{}, info *ServiceInfo, opts ...Option) error {
// put information from info to url
// ProviderConfig.Load
@@ -33,6 +33,6 @@
Meta map[string]interface{}
}
-func NewProvider() (*Provider, error) {
+func NewServer() (*Server, error) {
return nil, nil
}