Modify GRPC module and optimize dependencies
diff --git a/syncer/client/sync_client.go b/syncer/client/sync_client.go
new file mode 100644
index 0000000..6642b88
--- /dev/null
+++ b/syncer/client/sync_client.go
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client
+
+import (
+ "context"
+ "crypto/tls"
+ "sync"
+
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/syncer/grpc"
+ pb "github.com/apache/servicecomb-service-center/syncer/proto"
+ ggrpc "google.golang.org/grpc"
+)
+
+var (
+ clients sync.Map
+)
+
+// Client struct
+type Client struct {
+ addr string
+ conn *ggrpc.ClientConn
+ cli pb.SyncClient
+}
+
+// NewSyncClient Get the client from the client caches with addr
+func NewSyncClient(addr string, tlsConf *tls.Config) (cli *Client) {
+ val, ok := clients.Load(addr)
+ if ok {
+ cli = val.(*Client)
+ } else {
+ grpc.InjectClient(func(conn *ggrpc.ClientConn) {
+ cli = &Client{
+ addr: addr,
+ conn: conn,
+ cli: pb.NewSyncClient(conn),
+ }
+ clients.Store(addr, cli)
+ }, grpc.WithAddr(addr), grpc.WithTLSConfig(tlsConf))
+ }
+ return
+}
+
+// Pull implement the interface of sync server
+func (c *Client) Pull(ctx context.Context) (*pb.SyncData, error) {
+ data, err := c.cli.Pull(ctx, &pb.PullRequest{})
+ if err != nil {
+ log.Errorf(err, "Pull from grpc client failed, going to close the client")
+ closeClient(c.addr)
+ }
+ return data, err
+}
+
+func closeClient(addr string) {
+ val, ok := clients.Load(addr)
+ if ok {
+ cli := val.(*Client)
+ cli.conn.Close()
+ clients.Delete(addr)
+ log.Infof("Close grpc client connection to %s", addr)
+ }
+}
diff --git a/syncer/grpc/client.go b/syncer/grpc/client.go
deleted file mode 100644
index 19f49ba..0000000
--- a/syncer/grpc/client.go
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package grpc
-
-import (
- "context"
- "crypto/tls"
- "sync"
-
- "github.com/apache/servicecomb-service-center/pkg/log"
- pb "github.com/apache/servicecomb-service-center/syncer/proto"
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials"
-)
-
-var (
- clients = make(map[string]*Client)
- lock sync.RWMutex
-)
-
-// Client struct
-type Client struct {
- addr string
- conn *grpc.ClientConn
- cli pb.SyncClient
-}
-
-func Pull(ctx context.Context, addr string, tlsConf *tls.Config) (*pb.SyncData, error) {
- cli := getClient(addr, tlsConf)
-
- data, err := cli.cli.Pull(ctx, &pb.PullRequest{})
- if err != nil {
- log.Errorf(err, "Pull from grpc failed, going to close the client")
- closeClient(addr)
- }
- return data, err
-}
-
-func closeClient(addr string) {
- lock.RLock()
- cli, ok := clients[addr]
- lock.RUnlock()
- if ok {
-
- cli.conn.Close()
- lock.Lock()
- delete(clients, addr)
- lock.Unlock()
- log.Infof("Close grpc connection to %s", addr)
- }
-}
-
-// GetClient Get the client from the client caches with addr
-func getClient(addr string, tlsConf *tls.Config) *Client {
- lock.RLock()
- cli, ok := clients[addr]
- lock.RUnlock()
- if !ok {
- var conn *grpc.ClientConn
- var err error
-
- log.Infof("Create new grpc connection to %s", addr)
-
- if tlsConf != nil {
- conn, err = grpc.Dial(addr, grpc.WithTransportCredentials(credentials.NewTLS(tlsConf)))
- } else {
- conn, err = grpc.Dial(addr, grpc.WithInsecure())
- }
-
- if err != nil {
- log.Error("create grpc client conn failed", err)
- return nil
- }
- cli = &Client{conn: conn, cli: pb.NewSyncClient(conn), addr: addr}
- lock.Lock()
- clients[addr] = cli
- lock.Unlock()
- }
- return cli
-}
diff --git a/syncer/grpc/grpc.go b/syncer/grpc/grpc.go
new file mode 100644
index 0000000..f33f2d8
--- /dev/null
+++ b/syncer/grpc/grpc.go
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package grpc
+
+import (
+ "context"
+ "net"
+
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/pkg/rpc"
+ "github.com/apache/servicecomb-service-center/syncer/pkg/utils"
+ "github.com/pkg/errors"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials"
+)
+
+// Server struct
+type Server struct {
+ server *grpc.Server
+ listener net.Listener
+ running *utils.AtomicBool
+
+ readyCh chan struct{}
+ stopCh chan struct{}
+}
+
+// NewServer new grpc server with options
+func NewServer(ops ...Option) (*Server, error) {
+ conf := toGRPCConfig(ops...)
+ var srv *grpc.Server
+ if conf.tlsConfig != nil {
+ srv = grpc.NewServer(grpc.Creds(credentials.NewTLS(conf.tlsConfig)))
+ } else {
+ srv = grpc.NewServer()
+ }
+
+ rpc.RegisterGRpcServer(srv)
+
+ ls, err := net.Listen("tcp", conf.addr)
+ if err != nil {
+ return nil, errors.Wrapf(err, "grpc: listen failed, addr = %s", conf.addr)
+ }
+
+ return &Server{
+ server: srv,
+ listener: ls,
+ running: utils.NewAtomicBool(false),
+ readyCh: make(chan struct{}),
+ stopCh: make(chan struct{}),
+ }, nil
+}
+
+// Start grpc server
+func (s *Server) Start(ctx context.Context) {
+ s.running.DoToReverse(false, func() {
+ go func() {
+ err := s.server.Serve(s.listener)
+ if err != nil {
+ log.Error("grpc: start server failed", err)
+ s.Stop()
+ }
+ }()
+ close(s.readyCh)
+ go s.wait(ctx)
+ })
+}
+
+// Ready Returns a channel that will be closed when etcd is ready
+func (s *Server) Ready() <-chan struct{} {
+ return s.readyCh
+}
+
+// Stopped Returns a channel that will be closed when etcd is stopped
+func (s *Server) Stopped() <-chan struct{} {
+ return s.stopCh
+}
+
+// Stop etcd server
+func (s *Server) Stop() {
+ s.running.DoToReverse(true, func() {
+ if s.server != nil {
+ log.Info("grpc: begin shutdown")
+ s.server.Stop()
+ close(s.stopCh)
+ }
+ log.Info("grpc: shutdown complete")
+ })
+ return
+}
+
+func (s *Server) wait(ctx context.Context) {
+ select {
+ case <-s.stopCh:
+ log.Warn("grpc: server stopped, exited")
+ case <-ctx.Done():
+ log.Warn("grpc: cancel server by context")
+ s.Stop()
+ }
+}
+
+// InjectClient inject grpc client to proto module
+func InjectClient(injection func(conn *grpc.ClientConn), ops ...Option) error {
+ conf := toGRPCConfig(ops...)
+
+ var conn *grpc.ClientConn
+ var err error
+
+ if conf.tlsConfig != nil {
+ conn, err = grpc.Dial(conf.addr, grpc.WithTransportCredentials(credentials.NewTLS(conf.tlsConfig)))
+ } else {
+ conn, err = grpc.Dial(conf.addr, grpc.WithInsecure())
+ }
+
+ if err != nil {
+ return errors.Wrapf(err, "grpc: create grpc client conn failed, addr = %s", conf.addr)
+ }
+
+ injection(conn)
+ return nil
+}
diff --git a/syncer/grpc/grpc_test.go b/syncer/grpc/grpc_test.go
new file mode 100644
index 0000000..8240a82
--- /dev/null
+++ b/syncer/grpc/grpc_test.go
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package grpc
+
+import (
+ "context"
+ "errors"
+ "testing"
+ "time"
+
+ "github.com/apache/servicecomb-service-center/pkg/rpc"
+ pb "github.com/apache/servicecomb-service-center/syncer/proto"
+ "github.com/stretchr/testify/assert"
+ ggrpc "google.golang.org/grpc"
+)
+
+type testServer struct {}
+
+func (t *testServer) Pull(context.Context, *pb.PullRequest) (*pb.SyncData, error){
+ return &pb.SyncData{}, nil
+}
+
+func TestGRPCServer(t *testing.T) {
+ syncSvr := &testServer{}
+ addr := "127.0.0.1:9099"
+ svr, err := NewServer(
+ WithAddr(addr),
+ WithTLSConfig(nil),
+ )
+ assert.Nil(t, err)
+
+ rpc.RegisterService(func(s *ggrpc.Server) {
+ pb.RegisterSyncServer(s, syncSvr)
+ })
+
+ err = startServer(context.Background(), svr)
+ assert.Nil(t, err)
+
+ err = InjectClient(func(conn *ggrpc.ClientConn) {}, WithAddr(addr))
+ assert.Nil(t, err)
+
+ svr.Stop()
+}
+
+func startServer(ctx context.Context, svr *Server) (err error) {
+ svr.Start(ctx)
+ select {
+ case <-svr.Ready():
+ case <-svr.Stopped():
+ err = errors.New("start grpc server failed")
+ case <-time.After(time.Second * 3):
+ err = errors.New("start grpc server timeout")
+ }
+ return
+}
diff --git a/syncer/grpc/option.go b/syncer/grpc/option.go
new file mode 100644
index 0000000..e5ca425
--- /dev/null
+++ b/syncer/grpc/option.go
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package grpc
+
+import (
+ "crypto/tls"
+)
+
+type config struct {
+ addr string
+ tlsConfig *tls.Config
+}
+
+// Option to grpc config
+type Option func(*config)
+
+// WithAddr returns address option
+func WithAddr(addr string) Option {
+ return func(c *config) { c.addr = addr }
+}
+
+// WithTLSConfig returns tls config option
+func WithTLSConfig(conf *tls.Config) Option {
+ return func(c *config) { c.tlsConfig = conf }
+}
+
+func toGRPCConfig(ops ...Option) *config {
+ conf := &config{}
+ for _, op := range ops {
+ op(conf)
+ }
+ return conf
+}
diff --git a/syncer/grpc/server.go b/syncer/grpc/server.go
deleted file mode 100644
index f9aa83f..0000000
--- a/syncer/grpc/server.go
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package grpc
-
-import (
- "context"
- "crypto/tls"
- "net"
-
- "github.com/apache/servicecomb-service-center/pkg/gopool"
- "github.com/apache/servicecomb-service-center/pkg/log"
- pb "github.com/apache/servicecomb-service-center/syncer/proto"
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials"
-)
-
-type GRPCHandler interface {
- Discovery() *pb.SyncData
-}
-type PullHandle func() *pb.SyncData
-
-// Server struct
-type Server struct {
- lsn net.Listener
- addr string
- handler GRPCHandler
- readyCh chan struct{}
- stopCh chan struct{}
- tlsConf *tls.Config
-}
-
-// NewServer new grpc server
-func NewServer(addr string, handler GRPCHandler, tlsConf *tls.Config) *Server {
- return &Server{
- addr: addr,
- handler: handler,
- readyCh: make(chan struct{}),
- stopCh: make(chan struct{}),
- tlsConf: tlsConf,
- }
-}
-
-// Provide consumers with an interface to pull data
-func (s *Server) Pull(ctx context.Context, in *pb.PullRequest) (*pb.SyncData, error) {
- return s.handler.Discovery(), nil
-}
-
-// Stop grpc server
-func (s *Server) Stop() {
- if s.lsn != nil {
- s.lsn.Close()
- s.lsn = nil
- }
-}
-
-// Start grpc server
-func (s *Server) Start(ctx context.Context) {
- lsn, err := net.Listen("tcp", s.addr)
- if err == nil {
- var svc *grpc.Server
- if s.tlsConf != nil {
- svc = grpc.NewServer(grpc.Creds(credentials.NewTLS(s.tlsConf)))
- } else {
- svc = grpc.NewServer()
- }
-
- pb.RegisterSyncServer(svc, s)
- s.lsn = lsn
- gopool.Go(func(ctx context.Context) {
- err = svc.Serve(s.lsn)
- })
- }
-
- if err != nil {
- log.Error("start grpc failed", err)
- close(s.stopCh)
- return
- }
- log.Info("start grpc success")
- close(s.readyCh)
-}
-
-// Ready Returns a channel that will be closed when grpc is ready
-func (s *Server) Ready() <-chan struct{} {
- return s.readyCh
-}
-
-// Error Returns a channel that will be closed a grpc is stopped
-func (s *Server) Stopped() <-chan struct{} {
- return s.stopCh
-}
diff --git a/syncer/pkg/utils/atomic_bool.go b/syncer/pkg/utils/atomic_bool.go
index 4336033..0ce466c 100644
--- a/syncer/pkg/utils/atomic_bool.go
+++ b/syncer/pkg/utils/atomic_bool.go
@@ -49,7 +49,7 @@
}
a.m.Lock()
- fn()
atomic.StoreUint32(&a.status, a.status^1)
+ fn()
a.m.Unlock()
}
diff --git a/syncer/server/convert.go b/syncer/server/convert.go
index 0ac5c8c..91b6375 100644
--- a/syncer/server/convert.go
+++ b/syncer/server/convert.go
@@ -25,6 +25,7 @@
"github.com/apache/servicecomb-service-center/pkg/tlsutil"
"github.com/apache/servicecomb-service-center/syncer/config"
"github.com/apache/servicecomb-service-center/syncer/etcd"
+ "github.com/apache/servicecomb-service-center/syncer/grpc"
"github.com/apache/servicecomb-service-center/syncer/pkg/utils"
"github.com/apache/servicecomb-service-center/syncer/plugins"
"github.com/apache/servicecomb-service-center/syncer/serf"
@@ -69,6 +70,22 @@
}
}
+func convertGRPCOptions(c *config.Config) []grpc.Option {
+ opts := []grpc.Option{
+ grpc.WithAddr(c.Listener.RPCAddr),
+ }
+ if c.Listener.TLSMount.Enabled {
+ conf := c.GetTLSConfig(c.Listener.TLSMount.Name)
+ sslOps := append(tlsutil.DefaultServerTLSOptions(), tlsConfigToOptions(conf)...)
+ tlsConf, err := tlsutil.GetServerTLSConfig(sslOps...)
+ if err != nil {
+
+ }
+ opts = append(opts, grpc.WithTLSConfig(tlsConf))
+ }
+ return opts
+}
+
func convertTaskOptions(c *config.Config) []task.Option {
opts := make([]task.Option, 0, len(c.Task.Params))
for _, label := range c.Task.Params {
diff --git a/syncer/server/handler.go b/syncer/server/handler.go
index f00d8ba..c8a09fd 100644
--- a/syncer/server/handler.go
+++ b/syncer/server/handler.go
@@ -26,7 +26,7 @@
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/tlsutil"
"github.com/apache/servicecomb-service-center/pkg/util"
- "github.com/apache/servicecomb-service-center/syncer/grpc"
+ "github.com/apache/servicecomb-service-center/syncer/client"
pb "github.com/apache/servicecomb-service-center/syncer/proto"
)
@@ -51,9 +51,9 @@
}
}
-// Discovery discovery sync data from servicecenter
-func (s *Server) Discovery() *pb.SyncData {
- return s.servicecenter.Discovery()
+// Pull returns sync data of servicecenter
+func (s *Server) Pull(context.Context, *pb.PullRequest) (*pb.SyncData, error) {
+ return s.servicecenter.Discovery(), nil
}
// userEvent Handles "EventUser" notification events, no response required
@@ -94,7 +94,8 @@
}
}
- syncData, err := grpc.Pull(context.Background(), endpoint, tlsConfig)
+ cli := client.NewSyncClient(endpoint, tlsConfig)
+ syncData, err := cli.Pull(context.Background())
if err != nil {
log.Errorf(err, "Pull other serf instances failed, node name is '%s'", members[0].Name)
return
diff --git a/syncer/server/server.go b/syncer/server/server.go
index df0e23f..4a0941f 100644
--- a/syncer/server/server.go
+++ b/syncer/server/server.go
@@ -19,23 +19,24 @@
import (
"context"
- "crypto/tls"
"errors"
"strconv"
"syscall"
"github.com/apache/servicecomb-service-center/pkg/gopool"
"github.com/apache/servicecomb-service-center/pkg/log"
- "github.com/apache/servicecomb-service-center/pkg/tlsutil"
+ "github.com/apache/servicecomb-service-center/pkg/rpc"
"github.com/apache/servicecomb-service-center/syncer/config"
"github.com/apache/servicecomb-service-center/syncer/etcd"
"github.com/apache/servicecomb-service-center/syncer/grpc"
"github.com/apache/servicecomb-service-center/syncer/pkg/syssig"
"github.com/apache/servicecomb-service-center/syncer/pkg/utils"
"github.com/apache/servicecomb-service-center/syncer/plugins"
+ pb "github.com/apache/servicecomb-service-center/syncer/proto"
"github.com/apache/servicecomb-service-center/syncer/serf"
"github.com/apache/servicecomb-service-center/syncer/servicecenter"
"github.com/apache/servicecomb-service-center/syncer/task"
+ ggrpc "google.golang.org/grpc"
// import plugins
_ "github.com/apache/servicecomb-service-center/syncer/plugins/eureka"
@@ -180,6 +181,10 @@
return
}
+ rpc.RegisterService(func(svr *ggrpc.Server) {
+ pb.RegisterSyncServer(svr, s)
+ })
+
s.serf = serf.NewServer(convertSerfOptions(s.conf)...)
s.serf.OnceEventHandler(serf.NewEventHandler(serf.MemberJoinFilter(), s.waitClusterMembers))
@@ -201,18 +206,12 @@
return
}
- var tlsConfig *tls.Config
- if s.conf.Listener.TLSMount.Enabled {
- conf := s.conf.GetTLSConfig(s.conf.Listener.TLSMount.Name)
- sslOps := append(tlsutil.DefaultServerTLSOptions(), tlsConfigToOptions(conf)...)
- tlsConfig, err = tlsutil.GetServerTLSConfig(sslOps...)
- if err != nil {
- log.Error("get grpc server tls config failed", err)
- return
- }
+ s.grpc, err = grpc.NewServer(convertGRPCOptions(s.conf)...)
+ if err != nil {
+ log.Error("create grpc failed", err)
+ return
}
- s.grpc = grpc.NewServer(s.conf.Listener.RPCAddr, s, tlsConfig)
return nil
}