blob: 4a57a9db5078b7ba4c5ac80982d60a2da2f492b1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package disco
import (
"context"
"errors"
"fmt"
"strconv"
"time"
"github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/config"
apt "github.com/apache/servicecomb-service-center/server/core"
"github.com/apache/servicecomb-service-center/server/health"
quotasvc "github.com/apache/servicecomb-service-center/server/service/quota"
"github.com/apache/servicecomb-service-center/server/service/validator"
pb "github.com/go-chassis/cari/discovery"
)
const (
defaultMinInterval = 5 * time.Second
defaultMinTimes = 3
)
func RegisterInstance(ctx context.Context, in *pb.RegisterInstanceRequest) (*pb.RegisterInstanceResponse, error) {
if err := validator.Validate(in); err != nil {
remoteIP := util.GetIPFromContext(ctx)
log.Error(fmt.Sprintf("register instance failed, invalid parameters, operator %s", remoteIP), err)
return &pb.RegisterInstanceResponse{
Response: pb.CreateResponse(pb.ErrInvalidParams, err.Error()),
}, nil
}
remoteIP := util.GetIPFromContext(ctx)
if quotaErr := checkInstanceQuota(ctx); quotaErr != nil {
log.Error(fmt.Sprintf("register instance failed, endpoints %v, host '%s', serviceID %s, operator %s",
in.Instance.Endpoints, in.Instance.HostName, in.Instance.ServiceId, remoteIP), quotaErr)
response, err := datasource.WrapErrResponse(quotaErr)
return &pb.RegisterInstanceResponse{
Response: response,
}, err
}
if popErr := populateInstanceDefaultValue(ctx, in.Instance); popErr != nil {
response, err := datasource.WrapErrResponse(popErr)
return &pb.RegisterInstanceResponse{
Response: response,
}, err
}
return datasource.GetMetadataManager().RegisterInstance(ctx, in)
}
// instance util
func populateInstanceDefaultValue(ctx context.Context, instance *pb.MicroServiceInstance) error {
if len(instance.Status) == 0 {
instance.Status = pb.MSI_UP
}
instance.Timestamp = strconv.FormatInt(time.Now().Unix(), 10)
instance.ModTimestamp = instance.Timestamp
// 这里应该根据租约计时
// Health check对象仅用于呈现服务健康检查逻辑,如果CHECK_BY_PLATFORM类型,表明由sidecar代发心跳,实例120s超时
if instance.HealthCheck == nil {
instance.HealthCheck = &pb.HealthCheck{
Mode: pb.CHECK_BY_HEARTBEAT,
Interval: datasource.DefaultLeaseRenewalInterval,
Times: datasource.DefaultLeaseRetryTimes,
}
} else if instance.HealthCheck.Mode == pb.CHECK_BY_HEARTBEAT {
renewalInterval := int32(config.GetDuration("registry.instance.minInterval", defaultMinInterval) / time.Second)
if instance.HealthCheck.Interval < renewalInterval {
instance.HealthCheck.Interval = renewalInterval
}
retryTimes := int32(config.GetInt("registry.instance.minTimes", defaultMinTimes))
if instance.HealthCheck.Times < retryTimes {
instance.HealthCheck.Times = retryTimes
}
} else if instance.HealthCheck.Mode == pb.CHECK_BY_PLATFORM {
instance.HealthCheck.Interval = datasource.DefaultLeaseRenewalInterval
instance.HealthCheck.Times = datasource.DefaultLeaseRetryTimes
}
microservice, err := GetService(ctx, &pb.GetServiceRequest{ServiceId: instance.ServiceId})
if err != nil {
return pb.NewError(pb.ErrServiceNotExists, "Invalid 'serviceID' in request body.")
}
instance.Version = microservice.Version
return nil
}
func UnregisterInstance(ctx context.Context,
in *pb.UnregisterInstanceRequest) (*pb.UnregisterInstanceResponse, error) {
if err := validator.Validate(in); err != nil {
remoteIP := util.GetIPFromContext(ctx)
log.Error(fmt.Sprintf("unregister instance failed, invalid parameters, operator %s", remoteIP), err)
return &pb.UnregisterInstanceResponse{
Response: pb.CreateResponse(pb.ErrInvalidParams, err.Error()),
}, nil
}
return datasource.GetMetadataManager().UnregisterInstance(ctx, in)
}
func Heartbeat(ctx context.Context, in *pb.HeartbeatRequest) (*pb.HeartbeatResponse, error) {
if err := validator.Validate(in); err != nil {
remoteIP := util.GetIPFromContext(ctx)
log.Error(fmt.Sprintf("heartbeat failed, invalid parameters, operator %s", remoteIP), err)
return &pb.HeartbeatResponse{
Response: pb.CreateResponse(pb.ErrInvalidParams, err.Error()),
}, nil
}
return datasource.GetMetadataManager().Heartbeat(ctx, in)
}
func HeartbeatSet(ctx context.Context,
in *pb.HeartbeatSetRequest) (*pb.HeartbeatSetResponse, error) {
if len(in.Instances) == 0 {
log.Error("heartbeats failed, invalid request. Body not contain Instances or is empty", nil)
return &pb.HeartbeatSetResponse{
Response: pb.CreateResponse(pb.ErrInvalidParams, "Request format invalid."),
}, nil
}
return datasource.GetMetadataManager().HeartbeatSet(ctx, in)
}
func GetOneInstance(ctx context.Context,
in *pb.GetOneInstanceRequest) (*pb.GetOneInstanceResponse, error) {
err := validator.Validate(in)
if err != nil {
log.Error("get instance failed: invalid parameters", err)
return &pb.GetOneInstanceResponse{
Response: pb.CreateResponse(pb.ErrInvalidParams, err.Error()),
}, nil
}
return datasource.GetMetadataManager().GetInstance(ctx, in)
}
func GetInstances(ctx context.Context, in *pb.GetInstancesRequest) (*pb.GetInstancesResponse, error) {
err := validator.Validate(in)
if err != nil {
log.Error("get instances failed: invalid parameters", err)
return &pb.GetInstancesResponse{
Response: pb.CreateResponse(pb.ErrInvalidParams, err.Error()),
}, nil
}
return datasource.GetMetadataManager().GetInstances(ctx, in)
}
func FindInstances(ctx context.Context, in *pb.FindInstancesRequest) (*pb.FindInstancesResponse, error) {
err := validator.Validate(in)
if err != nil {
log.Error("find instance failed: invalid parameters", err)
return &pb.FindInstancesResponse{
Response: pb.CreateResponse(pb.ErrInvalidParams, err.Error()),
}, nil
}
return datasource.GetMetadataManager().FindInstances(ctx, in)
}
func BatchFindInstances(ctx context.Context, in *pb.BatchFindInstancesRequest) (*pb.BatchFindInstancesResponse, error) {
if len(in.Services) == 0 && len(in.Instances) == 0 {
err := errors.New("Required services or instances")
log.Error("batch find instance failed: invalid parameters", err)
return &pb.BatchFindInstancesResponse{
Response: pb.CreateResponse(pb.ErrInvalidParams, err.Error()),
}, nil
}
err := validator.Validate(in)
if err != nil {
log.Error("batch find instance failed: invalid parameters", err)
return &pb.BatchFindInstancesResponse{
Response: pb.CreateResponse(pb.ErrInvalidParams, err.Error()),
}, nil
}
return datasource.GetMetadataManager().BatchFind(ctx, in)
}
func UpdateInstanceStatus(ctx context.Context, in *pb.UpdateInstanceStatusRequest) (*pb.UpdateInstanceStatusResponse, error) {
if err := validator.Validate(in); err != nil {
updateStatusFlag := util.StringJoin([]string{in.ServiceId, in.InstanceId, in.Status}, "/")
log.Error(fmt.Sprintf("update instance[%s] status failed", updateStatusFlag), nil)
return &pb.UpdateInstanceStatusResponse{
Response: pb.CreateResponse(pb.ErrInvalidParams, err.Error()),
}, nil
}
return datasource.GetMetadataManager().UpdateInstanceStatus(ctx, in)
}
func UpdateInstanceProperties(ctx context.Context, in *pb.UpdateInstancePropsRequest) (*pb.UpdateInstancePropsResponse, error) {
if err := validator.Validate(in); err != nil {
instanceFlag := util.StringJoin([]string{in.ServiceId, in.InstanceId}, "/")
log.Error(fmt.Sprintf("update instance[%s] properties failed", instanceFlag), nil)
return &pb.UpdateInstancePropsResponse{
Response: pb.CreateResponse(pb.ErrInvalidParams, err.Error()),
}, nil
}
return datasource.GetMetadataManager().UpdateInstanceProperties(ctx, in)
}
func ClusterHealth(ctx context.Context) (*pb.GetInstancesResponse, error) {
if err := health.GlobalHealthChecker().Healthy(); err != nil {
return &pb.GetInstancesResponse{
Response: pb.CreateResponse(pb.ErrUnhealthy, err.Error()),
}, nil
}
cloneContext := util.SetDomainProject(util.CloneContext(ctx), datasource.RegistryDomain, datasource.RegistryProject)
svcResp, err := datasource.GetMetadataManager().ExistService(cloneContext, &pb.GetExistenceRequest{
Type: pb.ExistenceMicroservice,
AppId: apt.Service.AppId,
Environment: apt.Service.Environment,
ServiceName: apt.Service.ServiceName,
Version: apt.Service.Version,
})
if err != nil {
log.Error(fmt.Sprintf("health check failed: get service center[%s/%s/%s/%s]'s serviceID failed",
apt.Service.Environment, apt.Service.AppId, apt.Service.ServiceName, apt.Service.Version), err)
return &pb.GetInstancesResponse{
Response: pb.CreateResponse(pb.ErrInternal, err.Error()),
}, err
}
if len(svcResp.ServiceId) == 0 {
log.Error(fmt.Sprintf("health check failed: service center[%s/%s/%s/%s]'s serviceID does not exist",
apt.Service.Environment, apt.Service.AppId, apt.Service.ServiceName, apt.Service.Version), nil)
return &pb.GetInstancesResponse{
Response: pb.CreateResponse(pb.ErrServiceNotExists, "ServiceCenter's serviceID not exist."),
}, nil
}
instResp, err := datasource.GetMetadataManager().GetInstances(cloneContext, &pb.GetInstancesRequest{
ProviderServiceId: svcResp.ServiceId,
})
if err != nil {
log.Error(fmt.Sprintf("health check failed: get service center[%s][%s/%s/%s/%s]'s instances failed",
svcResp.ServiceId, apt.Service.Environment, apt.Service.AppId, apt.Service.ServiceName, apt.Service.Version), err)
return &pb.GetInstancesResponse{
Response: pb.CreateResponse(pb.ErrInternal, err.Error()),
}, err
}
return &pb.GetInstancesResponse{
Response: pb.CreateResponse(pb.ResponseSuccess, "Health check successfully."),
Instances: instResp.Instances,
}, nil
}
func checkInstanceQuota(ctx context.Context) error {
if !apt.IsSCInstance(ctx) {
return quotasvc.ApplyInstance(ctx, 1)
}
return nil
}