blob: 2546e369c584c9aa8cba3f2a5a46054d23f5051b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package etcd
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"github.com/go-chassis/cari/discovery"
crbac "github.com/go-chassis/cari/rbac"
"github.com/little-cui/etcdadpt"
"github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/datasource/etcd/path"
esync "github.com/apache/servicecomb-service-center/datasource/etcd/sync"
"github.com/apache/servicecomb-service-center/pkg/log"
putil "github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/config"
)
const (
SyncAllKey = "/cse-sr/sync-all"
SyncAllLockKey = "/cse-sr/sync-all-lock"
// one minutes
defaultLockTime = 60
)
var (
ErrWithoutDomainProject = errors.New("key without domain and project")
)
type SyncManager struct {
}
// SyncAll will list all services,accounts,roles,schemas,tags,deps and use tasks to store
func (s *SyncManager) SyncAll(ctx context.Context) error {
enable := config.GetBool("sync.enableOnStart", false)
if !enable {
return nil
}
ctx = putil.SetContext(ctx, putil.CtxEnableSync, "1")
exist, err := etcdadpt.Exist(ctx, SyncAllKey)
if err != nil {
return err
}
if exist {
log.Info(fmt.Sprintf("%s key already exists, do not need to do tasks", SyncAllKey))
return datasource.ErrSyncAllKeyExists
}
lock, err := etcdadpt.TryLock(SyncAllLockKey, defaultLockTime)
if err != nil || lock == nil {
log.Info(fmt.Sprintf("%s lock not acquired", SyncAllLockKey))
return nil
}
defer func(lock *etcdadpt.DLock) {
err := lock.Unlock()
if err != nil {
log.Error(fmt.Sprintf("fail to unlock the %s key", SyncAllLockKey), err)
}
}(lock)
err = syncAllRoles(ctx)
if err != nil {
return err
}
err = syncAllAccounts(ctx)
if err != nil {
return err
}
err = syncAllServices(ctx)
if err != nil {
return err
}
err = syncAllTags(ctx)
if err != nil {
return err
}
err = syncAllSchemas(ctx)
if err != nil {
return err
}
err = syncAllDependencies(ctx)
if err != nil {
return err
}
return etcdadpt.Put(ctx, SyncAllKey, "1")
}
func syncAllAccounts(ctx context.Context) error {
kvs, _, err := etcdadpt.List(ctx, path.GenerateRBACAccountKey(""))
if err != nil {
return err
}
syncOpts := make([]etcdadpt.OpOptions, 0)
putil.SetDomain(ctx, "")
putil.SetProject(ctx, "")
for _, v := range kvs {
a := &crbac.Account{}
err = json.Unmarshal(v.Value, a)
if err != nil {
log.Error("fail to unmarshal account ", err)
return err
}
opt, err := esync.GenCreateOpts(ctx, datasource.ResourceAccount, a)
if err != nil {
log.Error("fail to create sync opts", err)
return err
}
syncOpts = append(syncOpts, opt...)
}
err = etcdadpt.Txn(ctx, syncOpts)
if err != nil {
log.Error("fail to account tasks", err)
}
return err
}
func syncAllRoles(ctx context.Context) error {
kvs, _, err := etcdadpt.List(ctx, path.GenerateRBACRoleKey(""))
if err != nil {
return err
}
syncOpts := make([]etcdadpt.OpOptions, 0)
putil.SetDomain(ctx, "")
putil.SetProject(ctx, "")
for _, v := range kvs {
r := &crbac.Role{}
err = json.Unmarshal(v.Value, r)
if err != nil {
log.Error("fail to unmarshal role", err)
return err
}
opt, err := esync.GenCreateOpts(ctx, datasource.ResourceRole, r)
if err != nil {
log.Error("fail to create sync opts", err)
return err
}
syncOpts = append(syncOpts, opt...)
}
err = etcdadpt.Txn(ctx, syncOpts)
if err != nil {
log.Error("fail to role tasks", err)
}
return err
}
// syncAllTags func use kv resource task to store tags
func syncAllTags(ctx context.Context) error {
kvs, _, err := etcdadpt.List(ctx, path.GetServiceTagRootKey(""))
if err != nil {
return err
}
syncOpts := make([]etcdadpt.OpOptions, 0)
for _, kv := range kvs {
domain, project, err := getDomainProject(string(kv.Key), path.GetServiceTagRootKey(""))
if err != nil {
log.Error("fail to get domain and project", err)
return err
}
putil.SetDomain(ctx, domain)
putil.SetProject(ctx, project)
opts, err := esync.GenCreateOpts(ctx, datasource.ResourceKV, kv.Value,
esync.WithOpts(map[string]string{"key": string(kv.Key)}))
if err != nil {
log.Error("fail to create tag opts", err)
return err
}
syncOpts = append(syncOpts, opts...)
}
err = etcdadpt.Txn(ctx, syncOpts)
if err != nil {
log.Error("fail to create tag tasks", err)
}
return err
}
func syncAllServices(ctx context.Context) error {
kvs, _, err := etcdadpt.List(ctx, path.GetServiceRootKey(""))
if err != nil {
return err
}
syncOpts := make([]etcdadpt.OpOptions, 0)
for _, kv := range kvs {
service := &discovery.MicroService{}
err := json.Unmarshal(kv.Value, service)
if err != nil {
log.Error("fail to unmarshal service", err)
return err
}
domain, project, err := getDomainProject(string(kv.Key), path.GetServiceRootKey(""))
if err != nil {
log.Error("fail to get domain and project", err)
return err
}
putil.SetDomain(ctx, domain)
putil.SetProject(ctx, project)
request := &discovery.CreateServiceRequest{
Service: service,
}
opts, err := esync.GenCreateOpts(ctx, datasource.ResourceService, request)
if err != nil {
log.Error("fail to create service task", err)
return err
}
syncOpts = append(syncOpts, opts...)
}
err = etcdadpt.Txn(ctx, syncOpts)
if err != nil {
log.Error("fail to create service tasks", err)
}
return err
}
// syncAllSchemas func use kv resource task to store schemas
func syncAllSchemas(ctx context.Context) error {
putil.SetDomain(ctx, "")
putil.SetProject(ctx, "")
err := syncAllServiceSchemas(ctx)
if err != nil {
return err
}
err = syncAllServiceSchemaRefs(ctx)
if err != nil {
return err
}
err = syncAllServiceSchemaContents(ctx)
if err != nil {
return err
}
return syncAllServiceSchemaSummaries(ctx)
}
func syncAllServiceSchemas(ctx context.Context) error {
kvs, _, err := etcdadpt.List(ctx, path.GetServiceSchemaRootKey(""))
if err != nil {
return err
}
syncOpts := make([]etcdadpt.OpOptions, 0)
for _, kv := range kvs {
domain, project, err := getDomainProject(string(kv.Key), path.GetServiceSchemaRootKey(""))
if err != nil {
log.Error("fail to get domain and project", err)
return err
}
putil.SetDomain(ctx, domain)
putil.SetProject(ctx, project)
opts, err := esync.GenCreateOpts(ctx, datasource.ResourceKV, kv.Value,
esync.WithOpts(map[string]string{"key": string(kv.Key)}))
if err != nil {
log.Error("fail to create schema opts", err)
return err
}
syncOpts = append(syncOpts, opts...)
}
err = etcdadpt.Txn(ctx, syncOpts)
if err != nil {
log.Error("fail to create schema tasks", err)
}
return err
}
func syncAllServiceSchemaRefs(ctx context.Context) error {
kvs, _, err := etcdadpt.List(ctx, path.GetServiceSchemaRefRootKey(""))
if err != nil {
return err
}
syncOpts := make([]etcdadpt.OpOptions, 0)
for _, kv := range kvs {
domain, project, err := getDomainProject(string(kv.Key), path.GetServiceSchemaRefRootKey(""))
if err != nil {
log.Error("fail to get domain and project", err)
return err
}
putil.SetDomain(ctx, domain)
putil.SetProject(ctx, project)
opts, err := esync.GenCreateOpts(ctx, datasource.ResourceKV, kv.Value,
esync.WithOpts(map[string]string{"key": string(kv.Key)}))
if err != nil {
log.Error("fail to create schema ref opts", err)
return err
}
syncOpts = append(syncOpts, opts...)
}
err = etcdadpt.Txn(ctx, syncOpts)
if err != nil {
log.Error("fail to create schema ref tasks", err)
}
return err
}
func syncAllServiceSchemaContents(ctx context.Context) error {
kvs, _, err := etcdadpt.List(ctx, path.GetServiceSchemaContentRootKey(""))
if err != nil {
return err
}
syncOpts := make([]etcdadpt.OpOptions, 0)
for _, kv := range kvs {
domain, project, err := getDomainProject(string(kv.Key), path.GetServiceSchemaContentRootKey(""))
if err != nil {
log.Error("fail to get domain and project", err)
return err
}
putil.SetDomain(ctx, domain)
putil.SetProject(ctx, project)
opts, err := esync.GenCreateOpts(ctx, datasource.ResourceKV, kv.Value,
esync.WithOpts(map[string]string{"key": string(kv.Key)}))
if err != nil {
log.Error("fail to create schema content opts", err)
return err
}
syncOpts = append(syncOpts, opts...)
}
err = etcdadpt.Txn(ctx, syncOpts)
if err != nil {
log.Error("fail to create schema content tasks", err)
}
return err
}
func syncAllServiceSchemaSummaries(ctx context.Context) error {
kvs, _, err := etcdadpt.List(ctx, path.GetServiceSchemaSummaryRootKey(""))
if err != nil {
return err
}
syncOpts := make([]etcdadpt.OpOptions, 0)
for _, kv := range kvs {
domain, project, err := getDomainProject(string(kv.Key), path.GetServiceSchemaSummaryRootKey(""))
if err != nil {
log.Error("fail to get domain and project", err)
return err
}
putil.SetDomain(ctx, domain)
putil.SetProject(ctx, project)
opts, err := esync.GenCreateOpts(ctx, datasource.ResourceKV, kv.Value,
esync.WithOpts(map[string]string{"key": string(kv.Key)}))
if err != nil {
log.Error("fail to create schema summary opts", err)
return err
}
syncOpts = append(syncOpts, opts...)
}
err = etcdadpt.Txn(ctx, syncOpts)
if err != nil {
log.Error("fail to create schema summary tasks", err)
}
return err
}
func syncAllDependencies(ctx context.Context) error {
kvs, _, err := etcdadpt.List(ctx, path.GetServiceDependencyQueueRootKey(""))
if err != nil {
return err
}
syncOpts := make([]etcdadpt.OpOptions, 0)
for _, kv := range kvs {
domain, project, err := getDomainProject(string(kv.Key), path.GetServiceDependencyQueueRootKey(""))
if err != nil {
log.Error("fail to get domain and project", err)
return err
}
putil.SetDomain(ctx, domain)
putil.SetProject(ctx, project)
opts, err := esync.GenUpdateOpts(ctx, datasource.ResourceKV, kv.Value, esync.WithOpts(map[string]string{"key": string(kv.Key)}))
if err != nil {
log.Error("fail to create dep opts", err)
return err
}
syncOpts = append(syncOpts, opts...)
}
err = etcdadpt.Txn(ctx, syncOpts)
if err != nil {
log.Error("fail to create dep tasks", err)
}
return err
}
func getDomainProject(key string, prefixKey string) (domain string, project string, err error) {
splitKey := strings.Split(key, prefixKey)
if len(splitKey) != 2 {
return "", "", ErrWithoutDomainProject
}
suffixKey := splitKey[len(splitKey)-1]
splitStr := strings.Split(suffixKey, "/")
if len(splitStr) < 2 {
return "", "", ErrWithoutDomainProject
}
domain = splitStr[0]
project = splitStr[1]
return
}