Merge pull request #1112 from ztelur/feature-grpc-register
Feature grpc register
diff --git a/config/service.go b/config/service.go
index b746141..6deff3b 100644
--- a/config/service.go
+++ b/config/service.go
@@ -46,6 +46,11 @@
return proServices[name]
}
+// GetAllProviderService gets all ProviderService
+func GetAllProviderService() map[string]common.RPCService {
+ return proServices
+}
+
// GetCallback gets CallbackResponse by @name
func GetCallback(name string) func(response common.CallbackResponse) {
service := GetConsumerService(name)
diff --git a/protocol/grpc/client.go b/protocol/grpc/client.go
index 24ab125..b8a9143 100644
--- a/protocol/grpc/client.go
+++ b/protocol/grpc/client.go
@@ -90,20 +90,27 @@
}
// NewClient creates a new gRPC client.
-func NewClient(url *common.URL) *Client {
+func NewClient(url *common.URL) (*Client, error) {
// if global trace instance was set , it means trace function enabled. If not , will return Nooptracer
tracer := opentracing.GlobalTracer()
dialOpts := make([]grpc.DialOption, 0, 4)
maxMessageSize, _ := strconv.Atoi(url.GetParam(constant.MESSAGE_SIZE_KEY, "4"))
- dialOpts = append(dialOpts, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor(
+
+ //consumer config client connectTimeout
+ connectTimeout := config.GetConsumerConfig().ConnectTimeout
+
+ dialOpts = append(dialOpts, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(connectTimeout), grpc.WithUnaryInterceptor(
otgrpc.OpenTracingClientInterceptor(tracer, otgrpc.LogPayloads())),
grpc.WithDefaultCallOptions(
grpc.CallContentSubtype(clientConf.ContentSubType),
grpc.MaxCallRecvMsgSize(1024*1024*maxMessageSize),
grpc.MaxCallSendMsgSize(1024*1024*maxMessageSize)))
+
conn, err := grpc.Dial(url.Location, dialOpts...)
+
if err != nil {
- panic(err)
+ logger.Errorf("grpc dail error: %v", err)
+ return nil, err
}
key := url.GetParam(constant.BEAN_NAME_KEY, "")
@@ -113,7 +120,7 @@
return &Client{
ClientConn: conn,
invoker: reflect.ValueOf(invoker),
- }
+ }, nil
}
func getInvoker(impl interface{}, conn *grpc.ClientConn) interface{} {
diff --git a/protocol/grpc/client_test.go b/protocol/grpc/client_test.go
index 099f03e..a4630c8 100644
--- a/protocol/grpc/client_test.go
+++ b/protocol/grpc/client_test.go
@@ -29,6 +29,7 @@
import (
"github.com/apache/dubbo-go/common"
+ "github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol/grpc/internal"
)
@@ -48,6 +49,9 @@
url, err := common.NewURL("grpc://127.0.0.1:30000/GrpcGreeterImpl?accesslog=&anyhost=true&app.version=0.0.1&application=BDTService&async=false&bean.name=GrpcGreeterImpl&category=providers&cluster=failover&dubbo=dubbo-provider-golang-2.6.0&environment=dev&execute.limit=&execute.limit.rejected.handler=&generic=false&group=&interface=io.grpc.examples.helloworld.GreeterGrpc%24IGreeter&ip=192.168.1.106&loadbalance=random&methods.SayHello.loadbalance=random&methods.SayHello.retries=1&methods.SayHello.tps.limit.interval=&methods.SayHello.tps.limit.rate=&methods.SayHello.tps.limit.strategy=&methods.SayHello.weight=0&module=dubbogo+say-hello+client&name=BDTService&organization=ikurento.com&owner=ZX&pid=49427&reference.filter=cshutdown®istry.role=3&remote.timestamp=1576923717&retries=&service.filter=echo%2Ctoken%2Caccesslog%2Ctps%2Cexecute%2Cpshutdown&side=provider×tamp=1576923740&tps.limit.interval=&tps.limit.rate=&tps.limit.rejected.handler=&tps.limit.strategy=&tps.limiter=&version=&warmup=100!")
assert.Nil(t, err)
- cli := NewClient(url)
+ cli, err := NewClient(url)
+ if err != nil {
+ logger.Errorf("grpc new client error %v", err)
+ }
assert.NotNil(t, cli)
}
diff --git a/protocol/grpc/grpc_invoker_test.go b/protocol/grpc/grpc_invoker_test.go
index d5ebbb4..c8066c7 100644
--- a/protocol/grpc/grpc_invoker_test.go
+++ b/protocol/grpc/grpc_invoker_test.go
@@ -29,6 +29,7 @@
import (
"github.com/apache/dubbo-go/common"
+ "github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol/grpc/internal"
"github.com/apache/dubbo-go/protocol/invocation"
)
@@ -48,8 +49,10 @@
url, err := common.NewURL(mockGrpcCommonUrl)
assert.Nil(t, err)
- cli := NewClient(url)
-
+ cli, err := NewClient(url)
+ if err != nil {
+ logger.Errorf("grpc new client error %v", err)
+ }
invoker := NewGrpcInvoker(url, cli)
args := []reflect.Value{}
diff --git a/protocol/grpc/grpc_protocol.go b/protocol/grpc/grpc_protocol.go
index 3ad1245..ec1f5aa 100644
--- a/protocol/grpc/grpc_protocol.go
+++ b/protocol/grpc/grpc_protocol.go
@@ -68,29 +68,32 @@
}
func (gp *GrpcProtocol) openServer(url *common.URL) {
- _, ok := gp.serverMap[url.Location]
- if !ok {
- _, ok := gp.ExporterMap().Load(url.ServiceKey())
- if !ok {
- panic("[GrpcProtocol]" + url.Key() + "is not existing")
- }
+ gp.serverLock.Lock()
+ defer gp.serverLock.Unlock()
- gp.serverLock.Lock()
- _, ok = gp.serverMap[url.Location]
- if !ok {
- grpcMessageSize, _ := strconv.Atoi(url.GetParam(constant.MESSAGE_SIZE_KEY, "4"))
- srv := NewServer()
- srv.SetBufferSize(grpcMessageSize)
- gp.serverMap[url.Location] = srv
- srv.Start(url)
- }
- gp.serverLock.Unlock()
+ if _, ok := gp.serverMap[url.Location]; ok {
+ return
}
+
+ if _, ok := gp.ExporterMap().Load(url.ServiceKey()); !ok {
+ panic("[GrpcProtocol]" + url.Key() + "is not existing")
+ }
+
+ grpcMessageSize, _ := strconv.Atoi(url.GetParam(constant.MESSAGE_SIZE_KEY, "4"))
+ srv := NewServer()
+ srv.SetBufferSize(grpcMessageSize)
+ gp.serverMap[url.Location] = srv
+ srv.Start(url)
}
// Refer a remote gRPC service
func (gp *GrpcProtocol) Refer(url *common.URL) protocol.Invoker {
- invoker := NewGrpcInvoker(url, NewClient(url))
+ client, err := NewClient(url)
+ if err != nil {
+ logger.Warnf("can't dial the server: %s", url.Key())
+ return nil
+ }
+ invoker := NewGrpcInvoker(url, client)
gp.SetInvokers(invoker)
logger.Infof("Refer service: %s", url.String())
return invoker
@@ -102,6 +105,8 @@
gp.BaseProtocol.Destroy()
+ gp.serverLock.Lock()
+ defer gp.serverLock.Unlock()
for key, server := range gp.serverMap {
delete(gp.serverMap, key)
server.Stop()
diff --git a/protocol/grpc/grpc_protocol_test.go b/protocol/grpc/grpc_protocol_test.go
index 87ce714..71f05d0 100644
--- a/protocol/grpc/grpc_protocol_test.go
+++ b/protocol/grpc/grpc_protocol_test.go
@@ -28,13 +28,49 @@
import (
"github.com/apache/dubbo-go/common"
+ "github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/grpc/internal"
)
+func doInitProvider() {
+ providerConfig := config.ProviderConfig{
+ BaseConfig: config.BaseConfig{
+ ApplicationConfig: &config.ApplicationConfig{
+ Organization: "dubbo_org",
+ Name: "BDTService",
+ Module: "module",
+ Version: "0.0.1",
+ Owner: "dubbo",
+ Environment: "test",
+ },
+ },
+ Services: map[string]*config.ServiceConfig{
+ "GrpcGreeterImpl": {
+ InterfaceName: "io.grpc.examples.helloworld.GreeterGrpc$IGreeter",
+ Protocol: "grpc",
+ Registry: "shanghai_reg1,shanghai_reg2,hangzhou_reg1,hangzhou_reg2,hangzhou_service_discovery_reg",
+ Cluster: "failover",
+ Loadbalance: "random",
+ Retries: "3",
+ Methods: []*config.MethodConfig{
+ {
+ Name: "SayHello",
+ Retries: "2",
+ LoadBalance: "random",
+ Weight: 200,
+ },
+ },
+ },
+ },
+ }
+ config.SetProviderConfig(providerConfig)
+}
+
func TestGrpcProtocolExport(t *testing.T) {
// Export
addService()
+ doInitProvider()
proto := GetProtocol()
url, err := common.NewURL(mockGrpcCommonUrl)
diff --git a/protocol/grpc/server.go b/protocol/grpc/server.go
index e77e2ba..23460a0 100644
--- a/protocol/grpc/server.go
+++ b/protocol/grpc/server.go
@@ -21,6 +21,8 @@
"fmt"
"net"
"reflect"
+ "sync"
+ "time"
)
import (
@@ -31,7 +33,6 @@
import (
"github.com/apache/dubbo-go/common"
- "github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/protocol"
@@ -80,42 +81,88 @@
grpc.UnaryInterceptor(otgrpc.OpenTracingServerInterceptor(tracer)),
grpc.MaxRecvMsgSize(1024*1024*s.bufferSize),
grpc.MaxSendMsgSize(1024*1024*s.bufferSize))
-
- key := url.GetParam(constant.BEAN_NAME_KEY, "")
- service := config.GetProviderService(key)
-
- ds, ok := service.(DubboGrpcService)
- if !ok {
- panic("illegal service type registered")
- }
-
- m, ok := reflect.TypeOf(service).MethodByName("SetProxyImpl")
- if !ok {
- panic("method SetProxyImpl is necessary for grpc service")
- }
-
- exporter, _ := grpcProtocol.ExporterMap().Load(url.ServiceKey())
- if exporter == nil {
- panic(fmt.Sprintf("no exporter found for servicekey: %v", url.ServiceKey()))
- }
- invoker := exporter.(protocol.Exporter).GetInvoker()
- if invoker == nil {
- panic(fmt.Sprintf("no invoker found for servicekey: %v", url.ServiceKey()))
- }
- in := []reflect.Value{reflect.ValueOf(service)}
- in = append(in, reflect.ValueOf(invoker))
- m.Func.Call(in)
-
- server.RegisterService(ds.ServiceDesc(), service)
-
s.grpcServer = server
+
go func() {
+ providerServices := config.GetProviderConfig().Services
+
+ if len(providerServices) == 0 {
+ panic("provider service map is null")
+ }
+ // wait all exporter ready , then set proxy impl and grpc registerService
+ waitGrpcExporter(providerServices)
+ registerService(providerServices, server)
+
if err = server.Serve(lis); err != nil {
logger.Errorf("server serve failed with err: %v", err)
}
}()
}
+// getSyncMapLen get sync map len
+func getSyncMapLen(m *sync.Map) int {
+ length := 0
+
+ m.Range(func(_, _ interface{}) bool {
+ length++
+ return true
+ })
+ return length
+}
+
+// waitGrpcExporter wait until len(providerServices) = len(ExporterMap)
+func waitGrpcExporter(providerServices map[string]*config.ServiceConfig) {
+ t := time.NewTicker(50 * time.Millisecond)
+ defer t.Stop()
+ pLen := len(providerServices)
+ ta := time.After(10 * time.Second)
+
+ for {
+ select {
+ case <-t.C:
+ mLen := getSyncMapLen(grpcProtocol.ExporterMap())
+ if pLen == mLen {
+ return
+ }
+ case <-ta:
+ panic("wait grpc exporter timeout when start grpc server")
+ }
+ }
+}
+
+// registerService SetProxyImpl invoker and grpc service
+func registerService(providerServices map[string]*config.ServiceConfig, server *grpc.Server) {
+ for key, providerService := range providerServices {
+ service := config.GetProviderService(key)
+
+ ds, ok := service.(DubboGrpcService)
+ if !ok {
+ panic("illegal service type registered")
+ }
+
+ m, ok := reflect.TypeOf(service).MethodByName("SetProxyImpl")
+ if !ok {
+ panic("method SetProxyImpl is necessary for grpc service")
+ }
+ serviceKey := common.ServiceKey(providerService.InterfaceName, providerService.Group, providerService.Version)
+
+ exporter, _ := grpcProtocol.ExporterMap().Load(serviceKey)
+ if exporter == nil {
+ panic(fmt.Sprintf("no exporter found for servicekey: %v", serviceKey))
+ }
+ invoker := exporter.(protocol.Exporter).GetInvoker()
+ if invoker == nil {
+ panic(fmt.Sprintf("no invoker found for servicekey: %v", serviceKey))
+ }
+ in := []reflect.Value{reflect.ValueOf(service)}
+ in = append(in, reflect.ValueOf(invoker))
+ m.Func.Call(in)
+
+ server.RegisterService(ds.ServiceDesc(), service)
+
+ }
+}
+
// Stop gRPC server
func (s *Server) Stop() {
s.grpcServer.Stop()
diff --git a/registry/directory/directory.go b/registry/directory/directory.go
index 8c15240..b6f9322 100644
--- a/registry/directory/directory.go
+++ b/registry/directory/directory.go
@@ -111,7 +111,7 @@
if event == nil {
return
}
- go dir.refreshInvokers(event)
+ dir.refreshInvokers(event)
}
// NotifyAll notify the events that are complete Service Event List.
@@ -370,6 +370,8 @@
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(newUrl)
if newInvoker != nil {
dir.cacheInvokersMap.Store(key, newInvoker)
+ } else {
+ logger.Warnf("service will be added in cache invokers fail, result is null, invokers url is %+v", newUrl.String())
}
} else {
// if cached invoker has the same URL with the new URL, then no need to re-refer, and no need to destroy
@@ -383,6 +385,8 @@
if newInvoker != nil {
dir.cacheInvokersMap.Store(key, newInvoker)
return cacheInvoker.(protocol.Invoker), true
+ } else {
+ logger.Warnf("service will be updated in cache invokers fail, result is null, invokers url is %+v", newUrl.String())
}
}
return nil, false
diff --git a/test/integrate/dubbo/go-client/go.mod b/test/integrate/dubbo/go-client/go.mod
index 4708eb1..95a0449 100644
--- a/test/integrate/dubbo/go-client/go.mod
+++ b/test/integrate/dubbo/go-client/go.mod
@@ -1,3 +1,10 @@
module github.com/apache/dubbo-go/test/integrate/dubbo/go-client
go 1.13
+
+require (
+ github.com/apache/dubbo-go v1.5.5
+ github.com/apache/dubbo-go-hessian2 v1.9.1
+)
+
+replace github.com/apache/dubbo-go => ../../../../../dubbo-go
diff --git a/test/integrate/dubbo/go-server/go.mod b/test/integrate/dubbo/go-server/go.mod
index 9e11623..0de92a4 100644
--- a/test/integrate/dubbo/go-server/go.mod
+++ b/test/integrate/dubbo/go-server/go.mod
@@ -1,3 +1,10 @@
module github.com/apache/dubbo-go/test/integrate/dubbo/go-server
go 1.13
+
+require (
+ github.com/apache/dubbo-go v1.5.5
+ github.com/apache/dubbo-go-hessian2 v1.9.1
+)
+
+replace github.com/apache/dubbo-go => ../../../../../dubbo-go