| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package server |
| |
| import ( |
| "context" |
| "errors" |
| "strconv" |
| "syscall" |
| |
| "github.com/apache/servicecomb-service-center/pkg/gopool" |
| "github.com/apache/servicecomb-service-center/pkg/log" |
| "github.com/apache/servicecomb-service-center/pkg/rpc" |
| "github.com/apache/servicecomb-service-center/syncer/config" |
| "github.com/apache/servicecomb-service-center/syncer/etcd" |
| "github.com/apache/servicecomb-service-center/syncer/grpc" |
| "github.com/apache/servicecomb-service-center/syncer/pkg/syssig" |
| "github.com/apache/servicecomb-service-center/syncer/pkg/utils" |
| "github.com/apache/servicecomb-service-center/syncer/plugins" |
| pb "github.com/apache/servicecomb-service-center/syncer/proto" |
| "github.com/apache/servicecomb-service-center/syncer/serf" |
| "github.com/apache/servicecomb-service-center/syncer/servicecenter" |
| "github.com/apache/servicecomb-service-center/syncer/task" |
| ggrpc "google.golang.org/grpc" |
| |
| // import plugins |
| _ "github.com/apache/servicecomb-service-center/syncer/plugins/eureka" |
| |
| // import task |
| _ "github.com/apache/servicecomb-service-center/syncer/task/idle" |
| _ "github.com/apache/servicecomb-service-center/syncer/task/ticker" |
| ) |
| |
| var stopChanErr = errors.New("stopped syncer by stopCh") |
| |
| type moduleServer interface { |
| // Starts launches the module server, the returned is not guaranteed that the server is ready |
| // The moduleServer.Ready() channel will be transmit a message when server completed |
| Start(ctx context.Context) |
| |
| // Returns a channel that will be closed when the module server is ready |
| Ready() <-chan struct{} |
| |
| // Returns a channel that will be closed when the module server is stopped |
| Stopped() <-chan struct{} |
| } |
| |
| // Server struct for syncer |
| type Server struct { |
| ctx context.Context |
| cancel context.CancelFunc |
| // Syncer configuration |
| conf *config.Config |
| |
| // task for discovery |
| task task.Tasker |
| |
| // Wrap the servicecenter |
| servicecenter servicecenter.Servicecenter |
| |
| etcd *etcd.Server |
| |
| // Wraps the serf agent |
| serf *serf.Server |
| |
| // Wraps the grpc server |
| grpc *grpc.Server |
| |
| // The channel will be closed when receiving a system interrupt signal |
| stopCh chan struct{} |
| } |
| |
| // NewServer new server with Config |
| func NewServer(conf *config.Config) *Server { |
| ctx, cancel := context.WithCancel(context.Background()) |
| return &Server{ |
| ctx: ctx, |
| cancel: cancel, |
| conf: conf, |
| stopCh: make(chan struct{}), |
| } |
| } |
| |
| // Run syncer Server |
| func (s *Server) Run(ctx context.Context) { |
| var err error |
| s.initPlugin() |
| if err = s.initialization(); err != nil { |
| return |
| } |
| |
| // Start system signal listening, wait for user interrupt program |
| gopool.Go(syssig.Run) |
| |
| err = s.startModuleServer(s.serf) |
| if err != nil { |
| return |
| } |
| |
| err = s.startModuleServer(s.etcd) |
| if err != nil { |
| return |
| } |
| |
| err = s.startModuleServer(s.grpc) |
| if err != nil { |
| return |
| } |
| |
| s.servicecenter.SetStorageEngine(s.etcd.Storage()) |
| |
| s.task.Handle(s.tickHandler) |
| |
| s.task.Run(ctx) |
| |
| log.Info("start service done") |
| |
| <-s.stopCh |
| |
| s.Stop() |
| return |
| } |
| |
| // Stop Syncer Server |
| func (s *Server) Stop() { |
| if s.serf != nil { |
| //stop serf agent |
| s.serf.Stop() |
| } |
| |
| if s.grpc != nil { |
| s.grpc.Stop() |
| } |
| |
| if s.etcd != nil { |
| s.etcd.Stop() |
| } |
| |
| s.cancel() |
| |
| // Closes all goroutines in the pool |
| gopool.CloseAndWait() |
| } |
| |
| func (s *Server) startModuleServer(module moduleServer) (err error) { |
| gopool.Go(module.Start) |
| select { |
| case <-module.Ready(): |
| return nil |
| case <-module.Stopped(): |
| case <-s.stopCh: |
| } |
| s.Stop() |
| return stopChanErr |
| } |
| |
| // initialization Initialize the starter of the syncer |
| func (s *Server) initialization() (err error) { |
| err = syssig.AddSignalsHandler(func() { |
| log.Info("close svr stop chan") |
| close(s.stopCh) |
| }, syscall.SIGINT, syscall.SIGKILL, syscall.SIGTERM) |
| if err != nil { |
| log.Error("listen system signal failed", err) |
| return |
| } |
| |
| rpc.RegisterService(func(svr *ggrpc.Server) { |
| pb.RegisterSyncServer(svr, s) |
| }) |
| |
| s.serf = serf.NewServer(convertSerfOptions(s.conf)...) |
| s.serf.OnceEventHandler(serf.NewEventHandler(serf.MemberJoinFilter(), s.waitClusterMembers)) |
| |
| s.etcd, err = etcd.NewServer(convertEtcdOptions(s.conf)...) |
| if err != nil { |
| log.Errorf(err, "Create etcd failed, %s", err) |
| return |
| } |
| |
| s.task, err = task.GenerateTasker(s.conf.Task.Kind, convertTaskOptions(s.conf)...) |
| if err != nil { |
| log.Errorf(err, "Create tasker failed, %s", err) |
| return |
| } |
| |
| s.servicecenter, err = servicecenter.NewServicecenter(convertSCConfigOption(s.conf)...) |
| if err != nil { |
| log.Error("create servicecenter failed", err) |
| return |
| } |
| |
| s.grpc, err = grpc.NewServer(convertGRPCOptions(s.conf)...) |
| if err != nil { |
| log.Error("create grpc failed", err) |
| return |
| } |
| |
| return nil |
| } |
| |
| // initPlugin Initialize the plugin and load the external plugin according to the configuration |
| func (s *Server) initPlugin() { |
| plugins.SetPluginConfig(plugins.PluginServicecenter.String(), s.conf.Registry.Plugin) |
| plugins.LoadPlugins() |
| } |
| |
| func (s *Server) waitClusterMembers(data ...[]byte) bool { |
| if s.conf.Mode == config.ModeCluster { |
| tags := map[string]string{tagKeyClusterName: s.conf.Cluster} |
| if len(s.serf.MembersByTags(tags)) < groupExpect { |
| return false |
| } |
| err := s.configureCluster() |
| if err != nil { |
| log.Error("configure cluster failed", err) |
| s.Stop() |
| return false |
| } |
| } |
| s.serf.AddEventHandler(serf.NewEventHandler(serf.UserEventFilter(EventDiscovered), s.userEvent)) |
| return true |
| } |
| |
| // configureCluster Configuring the cluster by serf group member information |
| func (s *Server) configureCluster() error { |
| // get local member of serf |
| self := s.serf.LocalMember() |
| _, peerPort, _ := utils.SplitAddress(s.conf.Listener.PeerAddr) |
| ops := []etcd.Option{etcd.WithPeerAddr(self.Addr.String() + ":" + strconv.Itoa(peerPort))} |
| |
| // group members from serf as initial cluster members |
| tags := map[string]string{tagKeyClusterName: s.conf.Cluster} |
| for _, member := range s.serf.MembersByTags(tags) { |
| ops = append(ops, etcd.WithAddPeers(member.Name, member.Addr.String()+":"+member.Tags[tagKeyClusterPort])) |
| } |
| |
| return s.etcd.AddOptions(ops...) |
| } |