blob: f60c3b3d0212823fd53d0bde142ba0a663c95cac [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package servicecenter
import (
"github.com/apache/servicecomb-service-center/pkg/model"
"net/url"
"strconv"
"strings"
"github.com/apache/servicecomb-service-center/pkg/log"
scpb "github.com/apache/servicecomb-service-center/server/core/proto"
pb "github.com/apache/servicecomb-service-center/syncer/proto"
"github.com/gogo/protobuf/proto"
)
const (
expansionDatasource = "datasource"
expansionSchema = "schema"
)
// toSyncData transform service-center service cache to SyncData
func toSyncData(cache *model.Cache, schemas []*scpb.Schema) (data *pb.SyncData) {
data = &pb.SyncData{
Services: make([]*pb.SyncService, 0, len(cache.Microservices)),
Instances: make([]*pb.SyncInstance, 0, len(cache.Instances)),
}
for _, service := range cache.Microservices {
domainProject := getDomainProjectFromServiceKey(service.Key)
if domainProject == "" {
continue
}
syncService := toSyncService(service.Value)
syncService.DomainProject = domainProject
syncService.Expansions = append(syncService.Expansions, schemaExpansions(service.Value, schemas)...)
syncInstances := toSyncInstances(syncService.ServiceId, cache.Instances)
if len(syncInstances) == 0 {
continue
}
data.Services = append(data.Services, syncService)
data.Instances = append(data.Instances, syncInstances...)
}
return
}
// toSyncService transform service-center service to SyncService
func toSyncService(service *scpb.MicroService) (syncService *pb.SyncService) {
syncService = &pb.SyncService{
ServiceId: service.ServiceId,
App: service.AppId,
Name: service.ServiceName,
Version: service.Version,
Environment: service.Environment,
PluginName: PluginName,
}
switch service.Status {
case scpb.MS_UP:
syncService.Status = pb.SyncService_UP
case scpb.MS_DOWN:
syncService.Status = pb.SyncService_DOWN
default:
syncService.Status = pb.SyncService_UNKNOWN
}
content, err := proto.Marshal(service)
if err != nil {
log.Errorf(err, "transform sc service to syncer service failed: %s", err)
return
}
syncService.Expansions = []*pb.Expansion{{
Kind: expansionDatasource,
Bytes: content,
Labels: map[string]string{},
}}
return
}
// toSyncInstances transform service-center instances to SyncInstances
func toSyncInstances(serviceID string, instances []*model.Instance) (syncInstances []*pb.SyncInstance) {
for _, inst := range instances {
if inst.Value.Status != scpb.MSI_UP {
continue
}
if inst.Value.ServiceId == serviceID {
syncInstances = append(syncInstances, toSyncInstance(serviceID, inst.Value))
}
}
return
}
// toSyncInstance transform service-center instance to SyncInstance
func toSyncInstance(serviceID string, instance *scpb.MicroServiceInstance) (syncInstance *pb.SyncInstance) {
syncInstance = &pb.SyncInstance{
InstanceId: instance.InstanceId,
ServiceId: serviceID,
Endpoints: make([]string, 0, len(instance.Endpoints)),
HostName: instance.HostName,
Version: instance.Version,
PluginName: PluginName,
}
switch instance.Status {
case scpb.MSI_UP:
syncInstance.Status = pb.SyncInstance_UP
case scpb.MSI_DOWN:
syncInstance.Status = pb.SyncInstance_DOWN
case scpb.MSI_STARTING:
syncInstance.Status = pb.SyncInstance_STARTING
case scpb.MSI_OUTOFSERVICE:
syncInstance.Status = pb.SyncInstance_OUTOFSERVICE
default:
syncInstance.Status = pb.SyncInstance_UNKNOWN
}
for _, ep := range instance.Endpoints {
endpoint := ep
addr, err := url.Parse(ep)
if err != nil {
log.Errorf(err, "parse sc instance endpoint failed: %s", err)
continue
}
if addr.Scheme == "rest" {
prefix := "http://"
b, _ := strconv.ParseBool(addr.Query().Get("sslEnabled"))
if b {
prefix = "https://"
}
endpoint = strings.Replace(ep, addr.Scheme+"://", prefix, 1)
}
syncInstance.Endpoints = append(syncInstance.Endpoints, endpoint)
}
if instance.HealthCheck != nil {
syncInstance.HealthCheck = &pb.HealthCheck{
Port: instance.HealthCheck.Port,
Interval: instance.HealthCheck.Interval,
Times: instance.HealthCheck.Times,
Url: instance.HealthCheck.Url,
}
}
content, err := proto.Marshal(instance)
if err != nil {
log.Errorf(err, "transform sc instance to syncer instance failed: %s", err)
return
}
syncInstance.Expansions = []*pb.Expansion{{
Kind: expansionDatasource,
Bytes: content,
Labels: map[string]string{},
}}
return
}
func schemaExpansions(service *scpb.MicroService, schemas []*scpb.Schema) (expansions []*pb.Expansion) {
for _, val := range schemas {
if !inSlice(service.Schemas, val.SchemaId) {
continue
}
content, err := proto.Marshal(val)
if err != nil {
log.Errorf(err, "proto marshal schemas failed, app = %s, service = %s, version = %s datasource = %s",
service.AppId, service.ServiceName, service.Version, expansionSchema)
continue
}
expansions = append(expansions, &pb.Expansion{
Kind: expansionSchema,
Bytes: content,
Labels: map[string]string{},
})
}
return
}
// toService transform SyncService to service-center service
func toService(syncService *pb.SyncService) (service *scpb.MicroService) {
service = &scpb.MicroService{}
var err error
if syncService.PluginName == PluginName && len(syncService.Expansions) > 0 {
matches := pb.Expansions(syncService.Expansions).Find(expansionDatasource, map[string]string{})
if len(matches) > 0 {
err = proto.Unmarshal(matches[0].Bytes, service)
if err == nil {
service.ServiceId = syncService.ServiceId
return
}
log.Errorf(err, "proto unmarshal %s service, serviceID = %s, kind = %v, content = %v failed",
PluginName, service.ServiceId, matches[0].Kind, matches[0].Bytes)
}
}
service.AppId = syncService.App
service.ServiceId = syncService.ServiceId
service.ServiceName = syncService.Name
service.Version = syncService.Version
service.Status = pb.SyncService_Status_name[int32(syncService.Status)]
service.Environment = syncService.Environment
return
}
// toInstance transform SyncInstance to service-center instance
func toInstance(syncInstance *pb.SyncInstance) (instance *scpb.MicroServiceInstance) {
instance = &scpb.MicroServiceInstance{}
if syncInstance.PluginName == PluginName && len(syncInstance.Expansions) > 0 {
matches := pb.Expansions(syncInstance.Expansions).Find(expansionDatasource, map[string]string{})
if len(matches) > 0 {
err := proto.Unmarshal(matches[0].Bytes, instance)
if err == nil {
instance.InstanceId = syncInstance.InstanceId
instance.ServiceId = syncInstance.ServiceId
return
}
log.Errorf(err, "proto unmarshal %s instance, instanceID = %s, kind = %v, content = %v failed",
PluginName, instance.InstanceId, matches[0].Kind, matches[0].Bytes)
}
}
instance.InstanceId = syncInstance.InstanceId
instance.ServiceId = syncInstance.ServiceId
instance.Endpoints = make([]string, 0, len(syncInstance.Endpoints))
instance.HostName = syncInstance.HostName
instance.Version = syncInstance.Version
instance.Status = pb.SyncInstance_Status_name[int32(syncInstance.Status)]
for _, ep := range syncInstance.Endpoints {
addr, err := url.Parse(ep)
if err != nil {
log.Errorf(err, "parse sc instance endpoint failed: %s", err)
continue
}
endpoint := ""
switch addr.Scheme {
case "http":
endpoint = strings.Replace(ep, "http://", "rest://", 1)
case "https":
endpoint = strings.Replace(ep, "https://", "rest://", 1) + "?sslEnabled=true"
case "rest", "highway":
endpoint = ep
}
instance.Endpoints = append(instance.Endpoints, endpoint)
}
if syncInstance.HealthCheck != nil && syncInstance.HealthCheck.Mode != pb.HealthCheck_UNKNOWN {
instance.HealthCheck = &scpb.HealthCheck{
Mode: pb.HealthCheck_Modes_name[int32(syncInstance.HealthCheck.Mode)],
Port: syncInstance.HealthCheck.Port,
Interval: syncInstance.HealthCheck.Interval,
Times: syncInstance.HealthCheck.Times,
Url: syncInstance.HealthCheck.Url,
}
}
return
}
func getDomainProjectFromServiceKey(serviceKey string) string {
tenant := strings.Split(serviceKey, "/")
if len(tenant) < 6 {
return ""
}
return strings.Join(tenant[4:6], "/")
}
func inSlice(slice []string, val string) bool {
for _, item := range slice {
if item == val {
return true
}
}
return false
}