blob: 2eb673f4f0c50c7ae08487f09afa1abc170d5eb2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package health
import (
"errors"
"fmt"
"github.com/apache/servicecomb-mesher/proxy/config"
"github.com/go-chassis/go-chassis/core/lager"
"github.com/go-chassis/go-chassis/core/registry"
"github.com/go-chassis/go-chassis/pkg/runtime"
"github.com/go-mesh/openlogging"
"net"
"regexp"
"strconv"
"strings"
"time"
)
const (
DefaultInterval = time.Second * 30
DefaultTimeout = time.Second * 10
)
//Error definitions
var (
ErrPortEmpty = errors.New("port is empty")
ErrInvalidURI = errors.New("uri must start with /")
)
//Deal handle the unhealthy status
type Deal func(err error)
//L7Check is the interface for L7 checker
type L7Check func(check *config.HealthCheck, address string) error
//l7Checks save l7 check func
var l7Checks = make(map[string]L7Check)
//InstallChecker install checkers
func InstallChecker(n string, c L7Check) {
l7Checks[n] = c
}
//UpdateInstanceStatus update status in registrator, it just works in client side discovery
func UpdateInstanceStatus(err error) {
if registry.DefaultRegistrator == nil {
lager.Logger.Warn("Registrator is nil, can not update instance status")
return
}
if err != nil {
if runtime.InstanceStatus == runtime.StatusRunning {
lager.Logger.Info("service is not healthy, update status")
ChangeStatus(runtime.StatusDown)
}
} else {
if runtime.InstanceStatus == runtime.StatusDown {
lager.Logger.Info("service is healthy, update status")
ChangeStatus(runtime.StatusRunning)
}
}
}
//ChangeStatus change status in local and remote
func ChangeStatus(status string) {
if err := registry.DefaultRegistrator.UpdateMicroServiceInstanceStatus(runtime.ServiceID, runtime.InstanceID, status); err != nil {
lager.Logger.Error("update instance status failed:" + err.Error())
return
}
runtime.InstanceStatus = status
lager.Logger.Info("update instance status to: " + runtime.InstanceStatus)
}
//runCheckers run check routines
func runCheckers(c *config.HealthCheck, l7check L7Check, address string, deal Deal) (err error) {
var interval = DefaultInterval
if c.Interval != "" {
interval, err = time.ParseDuration(c.Interval)
if err != nil {
return err
}
}
ticker := time.NewTicker(interval)
go func() {
for range ticker.C {
err := CheckService(c, l7check, address)
if err != nil {
lager.Logger.Error(fmt.Sprintf("health check failed for service port[%s]: %s", c.Port, err))
}
deal(err)
}
}()
return nil
}
//CheckService check service health based on config
func CheckService(c *config.HealthCheck, l7check L7Check, address string) error {
lager.Logger.Debug(fmt.Sprintf("check port [%s]", c.Port))
if l7check != nil {
if err := l7check(c, address); err != nil {
return err
}
} else {
if err := L4Check(address); err != nil {
return err
}
}
lager.Logger.Debug("service is healthy: " + address)
return nil
}
//L4Check check tcp connection
func L4Check(address string) error {
c, err := net.DialTimeout("tcp", address, DefaultTimeout)
if err != nil {
return err
}
if err = c.Close(); err != nil {
return err
}
return nil
}
//Run Launch go routines to check service health
func Run() error {
openlogging.Info("local health manager start")
for _, v := range config.GetConfig().HealthCheck {
lager.Logger.Debug(fmt.Sprintf("check local health [%s],protocol [%s]", v.Port, v.Protocol))
address, check, err := ParseConfig(v)
if err != nil {
lager.Logger.Warn("Health keeper can not check health")
return err
}
//TODO make pluggable Deal
if err := runCheckers(v, check, address, UpdateInstanceStatus); err != nil {
return err
}
}
return nil
}
//ParseConfig validate config and return address, checker
//port name must not be empty
//port name must named as {protocol}-{name}
//protocol must has checker
func ParseConfig(c *config.HealthCheck) (string, L7Check, error) {
if c.Port == "" {
return "", nil, ErrPortEmpty
}
var check L7Check
if c.Protocol != "" {
var ok bool
check, ok = l7Checks[c.Protocol]
if !ok {
return "", nil, errors.New("don not support L7 checker:" + c.Protocol)
}
} else {
check = nil
}
address := "127.0.0.1:" + c.Port
if c.URI != "" {
if !strings.HasPrefix(c.URI, "/") {
return "", nil, ErrInvalidURI
}
}
if c.Match != nil {
if c.Match.Status != "" {
_, err := strconv.Atoi(c.Match.Status)
if err != nil {
return "", nil, err
}
}
if c.Match.Body != "" {
_, err := regexp.Compile(c.Match.Body)
if err != nil {
return "", nil, err
}
}
}
return address, check, nil
}
func init() {
InstallChecker("rest", HTTPCheck)
}