grpc-register&openServer code style
diff --git a/config/service.go b/config/service.go
index b746141..6deff3b 100644
--- a/config/service.go
+++ b/config/service.go
@@ -46,6 +46,11 @@
return proServices[name]
}
+// GetAllProviderService gets all ProviderService
+func GetAllProviderService() map[string]common.RPCService {
+ return proServices
+}
+
// GetCallback gets CallbackResponse by @name
func GetCallback(name string) func(response common.CallbackResponse) {
service := GetConsumerService(name)
diff --git a/protocol/grpc/grpc_protocol.go b/protocol/grpc/grpc_protocol.go
index db47503..bf7b782 100644
--- a/protocol/grpc/grpc_protocol.go
+++ b/protocol/grpc/grpc_protocol.go
@@ -68,24 +68,22 @@
}
func (gp *GrpcProtocol) openServer(url *common.URL) {
- _, ok := gp.serverMap[url.Location]
- if !ok {
- _, ok := gp.ExporterMap().Load(url.ServiceKey())
- if !ok {
- panic("[GrpcProtocol]" + url.Key() + "is not existing")
- }
+ gp.serverLock.Lock()
+ defer gp.serverLock.Unlock()
- gp.serverLock.Lock()
- _, ok = gp.serverMap[url.Location]
- if !ok {
- grpcMessageSize, _ := strconv.Atoi(url.GetParam(constant.MESSAGE_SIZE_KEY, "4"))
- srv := NewServer()
- srv.SetBufferSize(grpcMessageSize)
- gp.serverMap[url.Location] = srv
- srv.Start(url)
- }
- gp.serverLock.Unlock()
+ if _, ok := gp.serverMap[url.Location]; ok {
+ return
}
+
+ if _, ok := gp.ExporterMap().Load(url.ServiceKey()); !ok {
+ panic("[GrpcProtocol]" + url.Key() + "is not existing")
+ }
+
+ grpcMessageSize, _ := strconv.Atoi(url.GetParam(constant.MESSAGE_SIZE_KEY, "4"))
+ srv := NewServer()
+ srv.SetBufferSize(grpcMessageSize)
+ gp.serverMap[url.Location] = srv
+ srv.Start(url)
}
// Refer a remote gRPC service
diff --git a/protocol/grpc/server.go b/protocol/grpc/server.go
index e77e2ba..ce8fca8 100644
--- a/protocol/grpc/server.go
+++ b/protocol/grpc/server.go
@@ -21,6 +21,8 @@
"fmt"
"net"
"reflect"
+ "sync"
+ "time"
)
import (
@@ -31,7 +33,6 @@
import (
"github.com/apache/dubbo-go/common"
- "github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/protocol"
@@ -80,42 +81,88 @@
grpc.UnaryInterceptor(otgrpc.OpenTracingServerInterceptor(tracer)),
grpc.MaxRecvMsgSize(1024*1024*s.bufferSize),
grpc.MaxSendMsgSize(1024*1024*s.bufferSize))
-
- key := url.GetParam(constant.BEAN_NAME_KEY, "")
- service := config.GetProviderService(key)
-
- ds, ok := service.(DubboGrpcService)
- if !ok {
- panic("illegal service type registered")
- }
-
- m, ok := reflect.TypeOf(service).MethodByName("SetProxyImpl")
- if !ok {
- panic("method SetProxyImpl is necessary for grpc service")
- }
-
- exporter, _ := grpcProtocol.ExporterMap().Load(url.ServiceKey())
- if exporter == nil {
- panic(fmt.Sprintf("no exporter found for servicekey: %v", url.ServiceKey()))
- }
- invoker := exporter.(protocol.Exporter).GetInvoker()
- if invoker == nil {
- panic(fmt.Sprintf("no invoker found for servicekey: %v", url.ServiceKey()))
- }
- in := []reflect.Value{reflect.ValueOf(service)}
- in = append(in, reflect.ValueOf(invoker))
- m.Func.Call(in)
-
- server.RegisterService(ds.ServiceDesc(), service)
-
s.grpcServer = server
+
go func() {
+ providerServices := config.GetProviderConfig().Services
+
+ if len(providerServices) == 0 {
+ panic("provider service map is null")
+ }
+
+ waitGrpcExporter(providerServices)
+ registerService(providerServices, server)
+
if err = server.Serve(lis); err != nil {
logger.Errorf("server serve failed with err: %v", err)
}
}()
}
+// getSyncMapLen get sync map len
+func getSyncMapLen(m *sync.Map) int {
+ length := 0
+
+ m.Range(func(_, _ interface{}) bool {
+ length++
+ return true
+ })
+ return length
+}
+
+// waitGrpcExporter wait until len(providerServices) = len(ExporterMap)
+func waitGrpcExporter(providerServices map[string]*config.ServiceConfig) {
+ t := time.NewTicker(50 * time.Millisecond)
+ defer t.Stop()
+ pLen := len(providerServices)
+ ta := time.After(10 * time.Second)
+
+ for {
+ select {
+ case <-t.C:
+ mLen := getSyncMapLen(grpcProtocol.ExporterMap())
+ if pLen == mLen {
+ return
+ }
+ case <-ta:
+ panic("wait grpc exporter timeout when start grpc server")
+ }
+ }
+}
+
+// registerService SetProxyImpl invoker and grpc service
+func registerService(providerServices map[string]*config.ServiceConfig, server *grpc.Server) {
+ for key, providerService := range providerServices {
+ service := config.GetProviderService(key)
+
+ ds, ok := service.(DubboGrpcService)
+ if !ok {
+ panic("illegal service type registered")
+ }
+
+ m, ok := reflect.TypeOf(service).MethodByName("SetProxyImpl")
+ if !ok {
+ panic("method SetProxyImpl is necessary for grpc service")
+ }
+ serviceKey := common.ServiceKey(providerService.InterfaceName, providerService.Group, providerService.Version)
+
+ exporter, _ := grpcProtocol.ExporterMap().Load(serviceKey)
+ if exporter == nil {
+ panic(fmt.Sprintf("no exporter found for servicekey: %v", serviceKey))
+ }
+ invoker := exporter.(protocol.Exporter).GetInvoker()
+ if invoker == nil {
+ panic(fmt.Sprintf("no invoker found for servicekey: %v", serviceKey))
+ }
+ in := []reflect.Value{reflect.ValueOf(service)}
+ in = append(in, reflect.ValueOf(invoker))
+ m.Func.Call(in)
+
+ server.RegisterService(ds.ServiceDesc(), service)
+
+ }
+}
+
// Stop gRPC server
func (s *Server) Stop() {
s.grpcServer.Stop()