blob: 711fb5f030293de15a388fe1f42f7b7396304ac5 [file] [log] [blame]
//Copyright 2017 Huawei Technologies Co., Ltd
//
//Licensed under the Apache License, Version 2.0 (the "License");
//you may not use this file except in compliance with the License.
//You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//Unless required by applicable law or agreed to in writing, software
//distributed under the License is distributed on an "AS IS" BASIS,
//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//See the License for the specific language governing permissions and
//limitations under the License.
package api
import (
"fmt"
"github.com/ServiceComb/service-center/server/core"
"github.com/ServiceComb/service-center/server/core/mux"
pb "github.com/ServiceComb/service-center/server/core/proto"
rs "github.com/ServiceComb/service-center/server/rest"
"github.com/ServiceComb/service-center/server/rest/handlers"
"github.com/ServiceComb/service-center/util"
"github.com/ServiceComb/service-center/util/rest"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"net"
"net/http"
"strings"
"time"
)
type APIType int64
type APIServerConfig struct {
SSL bool
VerifyClient bool
HostName string
Endpoints map[APIType]string
}
type APIServer struct {
Config APIServerConfig
grpcSvr *grpc.Server
isClose bool
err chan error
}
const (
GRPC APIType = 0
REST APIType = 1
)
func (s *APIServer) Err() <-chan error {
return s.err
}
func (s *APIServer) startGrpcServer() {
var err error
ipAddr, ok := s.Config.Endpoints[GRPC]
if !ok {
return
}
if s.Config.SSL {
tlsConfig, err := rest.GetServerTLSConfig(s.Config.VerifyClient)
if err != nil {
util.LOGGER.Error("error to get server tls config", err)
s.err <- err
return
}
creds := credentials.NewTLS(tlsConfig)
s.grpcSvr = grpc.NewServer(grpc.Creds(creds))
} else {
s.grpcSvr = grpc.NewServer()
}
pb.RegisterServiceCtrlServer(s.grpcSvr, rs.ServiceAPI)
pb.RegisterServiceInstanceCtrlServer(s.grpcSvr, rs.InstanceAPI)
util.LOGGER.Infof("listen on server %s", ipAddr)
ls, err := net.Listen("tcp", ipAddr)
if err != nil {
util.LOGGER.Error("error to start Grpc API server "+ipAddr, err)
s.err <- err
return
}
go func() {
err := s.grpcSvr.Serve(ls)
if !s.isClose {
util.LOGGER.Error("error to start Grpc API server "+ipAddr, err)
s.err <- err
}
}()
}
func (s *APIServer) startRESTfulServer() {
var err error
ipAddr, ok := s.Config.Endpoints[REST]
if !ok {
return
}
http.Handle("/", handlers.DefaultServerHandler())
go func() {
if s.Config.SSL {
err = rest.ListenAndServeTLS(ipAddr, nil)
} else {
err = rest.ListenAndServe(ipAddr, nil)
}
if !s.isClose {
util.LOGGER.Error("error to start RESTful API server "+ipAddr, err)
s.err <- err
}
}()
}
func (s *APIServer) registerServiceCenter() error {
err := s.registryService()
if err != nil {
return err
}
// 实例信息
return s.registryInstance()
}
func (s *APIServer) registryService() error {
//分布式sc 都会一起抢注,导致注册了多个sc微服务静态信息,需要使用分布式同步锁解决
lock, err := mux.Lock(mux.PROCESS_LOCK)
if err != nil {
util.LOGGER.Errorf(err, "could not create global lock %s", mux.PROCESS_LOCK)
return err
}
defer lock.Unlock()
ctx := core.AddDefaultContextValue(context.TODO())
respE, err := rs.ServiceAPI.Exist(ctx, core.GetExistenceRequest())
if err != nil {
util.LOGGER.Error("query service center existence failed", err)
return err
}
if respE.Response.Code == pb.Response_SUCCESS {
util.LOGGER.Warnf(nil, "service center service already registered, service id %s", respE.ServiceId)
respG, err := rs.ServiceAPI.GetOne(ctx, core.GetServiceRequest(respE.ServiceId))
if respE.Response.Code != pb.Response_SUCCESS {
return fmt.Errorf("query service center service info failed, service id %s(%s)",
respE.ServiceId, err)
}
core.Service = respG.Service
return nil
}
respS, err := rs.ServiceAPI.Create(ctx, core.CreateServiceRequest())
if err != nil {
util.LOGGER.Error("register service center failed", err)
return err
}
core.Service.ServiceId = respS.ServiceId
util.LOGGER.Infof("register service center service successfully, service id %s", respE.ServiceId)
return nil
}
func (s *APIServer) registryInstance() error {
core.Instance.ServiceId = core.Service.ServiceId
endpoints := []string{}
if address, ok := s.Config.Endpoints[GRPC]; ok {
endpoints = append(endpoints, strings.Join([]string{"grpc", address}, "://"))
}
if address, ok := s.Config.Endpoints[REST]; ok {
endpoints = append(endpoints, strings.Join([]string{"rest", address}, "://"))
}
ctx := core.AddDefaultContextValue(context.TODO())
respI, err := rs.InstanceAPI.Register(ctx,
core.RegisterInstanceRequest(s.Config.HostName, endpoints))
if respI.GetResponse().Code != pb.Response_SUCCESS {
err = fmt.Errorf("register service center instance failed, %s", respI.GetResponse().Message)
util.LOGGER.Error(err.Error(), nil)
return err
}
core.Instance.InstanceId = respI.InstanceId
util.LOGGER.Infof("register service center instance successfully, instance %s/%s, endpoints %s",
core.Service.ServiceId, respI.InstanceId, endpoints)
return nil
}
func (s *APIServer) unregisterInstance() error {
if len(core.Instance.InstanceId) == 0 {
return nil
}
ctx := core.AddDefaultContextValue(context.TODO())
respI, err := rs.InstanceAPI.Unregister(ctx, core.UnregisterInstanceRequest())
if respI.GetResponse().Code != pb.Response_SUCCESS {
err = fmt.Errorf("unregister service center instance failed, %s", respI.GetResponse().Message)
util.LOGGER.Error(err.Error(), nil)
return err
}
util.LOGGER.Warnf(nil, "unregister service center instance successfully, %s/%s",
core.Service.ServiceId, core.Instance.InstanceId)
return nil
}
func (s *APIServer) doAPIServerHeartBeat() {
if s.isClose {
return
}
ctx := core.AddDefaultContextValue(context.TODO())
respI, err := rs.InstanceAPI.Heartbeat(ctx, core.HeartbeatRequest())
if respI.GetResponse().Code == pb.Response_SUCCESS {
util.LOGGER.Debugf("update service center %s heartbeat %s successfully",
core.Instance.ServiceId, core.Instance.InstanceId)
return
}
util.LOGGER.Errorf(err, "update service center %s instance %s heartbeat failed",
core.Instance.ServiceId, core.Instance.InstanceId)
//服务不存在,创建服务
err = s.registerServiceCenter()
if err != nil {
util.LOGGER.Errorf(err, "Service %s/%s/%s does not exist, and retry to register it failed.",
core.Service.AppId, core.Service.ServiceName, core.Service.Version)
}
}
func (s *APIServer) startHeartBeatService() {
go func() {
for {
select {
case <-s.err:
return
case <-time.After(time.Duration(core.Instance.HealthCheck.Interval) * time.Second):
s.doAPIServerHeartBeat()
}
}
}()
}
// 需保证ETCD启动成功后才执行该方法
func (s *APIServer) Start() {
if !s.isClose {
return
}
s.isClose = false
// 自注册
err := s.registerServiceCenter()
if err != nil {
s.err <- err
}
s.startRESTfulServer()
s.startGrpcServer()
// 心跳
s.startHeartBeatService()
util.LOGGER.Info("api server is ready")
}
func (s *APIServer) Stop() {
if s.isClose {
return
}
s.unregisterInstance()
s.isClose = true
rest.CloseServer()
if s.grpcSvr != nil {
s.grpcSvr.GracefulStop()
}
close(s.err)
util.LOGGER.Info("api server stopped.")
}
var apiServer *APIServer
func init() {
apiServer = &APIServer{
isClose: true,
err: make(chan error, 1),
}
}
func GetAPIServer() *APIServer {
return apiServer
}