blob: 1a9a2f5d7d4d7e6b1e5a33f898414fd2e8372351 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package server
import (
"context"
"io"
"os"
"strings"
"syscall"
"github.com/apache/servicecomb-service-center/pkg/gopool"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/syncer/config"
"github.com/apache/servicecomb-service-center/syncer/etcd"
"github.com/apache/servicecomb-service-center/syncer/grpc"
"github.com/apache/servicecomb-service-center/syncer/pkg/syssig"
"github.com/apache/servicecomb-service-center/syncer/pkg/ticker"
"github.com/apache/servicecomb-service-center/syncer/pkg/utils"
"github.com/apache/servicecomb-service-center/syncer/plugins"
"github.com/apache/servicecomb-service-center/syncer/serf"
"github.com/apache/servicecomb-service-center/syncer/servicecenter"
)
// Server struct for syncer
type Server struct {
// Syncer configuration
conf *config.Config
// Ticker for Syncer
tick *ticker.TaskTicker
// Wrap the servicecenter
servicecenter servicecenter.Servicecenter
etcd *etcd.Agent
// Wraps the serf agent
agent *serf.Agent
// Wraps the grpc server
grpc *grpc.Server
}
// NewServer new server with Config
func NewServer(conf *config.Config) *Server {
return &Server{
conf: conf,
}
}
// Run syncer Server
func (s *Server) Run(ctx context.Context) {
s.initPlugin()
if err := s.runEtcd(); err != nil {
return
}
if err := s.runSerf(); err != nil {
return
}
if err := s.runGrpc(); err != nil {
return
}
if err := s.createServiceCenter(); err != nil {
return
}
if err := s.runTicker(); err != nil {
return
}
s.waitQuit(ctx)
}
// Stop Syncer Server
func (s *Server) Stop() {
if s.tick != nil {
s.tick.Stop()
}
if s.agent != nil {
// removes the serf eventHandler
s.agent.DeregisterEventHandler(s)
//Leave from Serf
s.agent.Leave()
// closes this serf agent
s.agent.Shutdown()
}
if s.grpc != nil {
s.grpc.Stop()
}
if s.etcd != nil {
s.etcd.Stop()
}
// Closes all goroutines in the pool
gopool.CloseAndWait()
}
// initPlugin Initialize the plugin and load the external plugin according to the configuration
func (s *Server) initPlugin() {
plugins.SetPluginConfig(plugins.PluginServicecenter.String(), s.conf.ServicecenterPlugin)
plugins.LoadPlugins()
}
func (s *Server) runEtcd() (err error) {
s.etcd = etcd.NewAgent(etcd.DefaultConfig())
if err = s.etcd.Run(); err != nil {
log.Errorf(err, "Run etcd failed, %s", err)
}
return
}
func (s *Server) runSerf() (err error) {
s.agent, err = serf.Create(s.conf.Config, createLogFile(s.conf.LogFile))
if err != nil {
log.Errorf(err, "Create serf failed, %s", err)
return
}
s.agent.RegisterEventHandler(s)
if err = s.agent.Start(); err != nil {
log.Errorf(err, "Start serf failed, %s", err)
return
}
if s.conf.JoinAddr != "" {
_, err = s.agent.Join([]string{s.conf.JoinAddr}, false)
if err != nil {
log.Errorf(err, "Join serf cluster failed, %s", err)
}
}
return
}
func (s *Server) runGrpc() (err error) {
s.grpc = grpc.NewServer(s.conf.RPCAddr, s)
if err = s.grpc.Run(); err != nil {
log.Errorf(err, "Run grpc failed, %s", err)
}
return err
}
func (s *Server) createServiceCenter() (err error) {
s.servicecenter, err = servicecenter.NewServicecenter(strings.Split(s.conf.SCAddr, ","), s.etcd.Storage())
if err != nil {
log.Errorf(err, "Create service center failed, %s", err)
}
return
}
func (s *Server) runTicker() error {
s.tick = ticker.NewTaskTicker(s.conf.TickerInterval, s.tickHandler)
gopool.Go(s.tick.Start)
return nil
}
// waitQuit Waiting for system quit signal
func (s *Server) waitQuit(ctx context.Context) {
err := syssig.AddSignalsHandler(func() {
s.Stop()
}, syscall.SIGINT, syscall.SIGKILL, syscall.SIGTERM)
if err != nil {
log.Errorf(err, "Syncer add signals handler failed")
return
}
syssig.Run(ctx)
}
// createLogFile create log file
func createLogFile(logFile string) (fw io.Writer) {
fw = os.Stderr
if logFile == "" {
return
}
f, err := utils.OpenFile(logFile)
if err != nil {
log.Errorf(err, "Syncer open log file %s failed", logFile)
return
}
return f
}