blob: e1b63c90de280b4c52c94d66230785bec1bda03f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package mongo
import (
"context"
"fmt"
dmongo "github.com/go-chassis/cari/db/mongo"
"github.com/go-chassis/cari/discovery"
rbacmodel "github.com/go-chassis/cari/rbac"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/datasource/mongo/model"
"github.com/apache/servicecomb-service-center/datasource/mongo/sync"
"github.com/apache/servicecomb-service-center/pkg/log"
putil "github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/config"
)
const (
SyncAllKey = "sync-all"
)
type SyncManager struct {
}
// SyncAll will list all services,accounts,roles,schemas,tags,deps and use tasks to store
func (s *SyncManager) SyncAll(ctx context.Context) error {
enable := config.GetBool("sync.enableOnStart", false)
if !enable {
return nil
}
ctx = putil.SetContext(ctx, putil.CtxEnableSync, "1")
exist, err := syncAllKeyExist(ctx)
if err != nil {
return err
}
if exist {
log.Info(fmt.Sprintf("%s key already exists, do not need to do tasks", SyncAllKey))
return datasource.ErrSyncAllKeyExists
}
// TODO use mongo distributed lock
err = syncAllAccounts(ctx)
if err != nil {
return err
}
err = syncAllRoles(ctx)
if err != nil {
return err
}
err = syncAllServices(ctx)
if err != nil {
return err
}
return insertSyncAllKey(ctx)
}
func syncAllKeyExist(ctx context.Context) (bool, error) {
count, err := dmongo.GetClient().GetDB().Collection(model.CollectionSync).CountDocuments(ctx, bson.M{"key": SyncAllKey})
if err != nil {
return false, err
}
if count > 0 {
return true, nil
}
return false, nil
}
func syncAllAccounts(ctx context.Context) error {
cursor, err := dmongo.GetClient().GetDB().Collection(model.CollectionAccount).Find(ctx, bson.M{})
if err != nil {
return err
}
defer func(cursor *mongo.Cursor, ctx context.Context) {
err := cursor.Close(ctx)
if err != nil {
log.Error("fail to close mongo cursor", err)
}
}(cursor, ctx)
for cursor.Next(ctx) {
var account rbacmodel.Account
err = cursor.Decode(&account)
if err != nil {
log.Error("failed to decode account", err)
return err
}
err = sync.DoCreateOpts(ctx, datasource.ResourceAccount, &account)
if err != nil {
log.Error("failed to create account task", err)
return err
}
}
return nil
}
func syncAllRoles(ctx context.Context) error {
cursor, err := dmongo.GetClient().GetDB().Collection(model.CollectionRole).Find(ctx, bson.M{})
if err != nil {
return err
}
defer func(cursor *mongo.Cursor, ctx context.Context) {
err := cursor.Close(ctx)
if err != nil {
log.Error("fail to close mongo cursor", err)
}
}(cursor, ctx)
for cursor.Next(ctx) {
var role rbacmodel.Role
err = cursor.Decode(&role)
if err != nil {
log.Error("failed to decode role", err)
return err
}
err = sync.DoCreateOpts(ctx, datasource.ResourceRole, &role)
if err != nil {
log.Error("failed to create role task", err)
return err
}
}
return nil
}
func syncAllServices(ctx context.Context) error {
cursor, err := dmongo.GetClient().GetDB().Collection(model.CollectionService).Find(ctx, bson.M{})
if err != nil {
return err
}
defer func(cursor *mongo.Cursor, ctx context.Context) {
err := cursor.Close(ctx)
if err != nil {
log.Error("fail to close mongo cursor", err)
}
}(cursor, ctx)
for cursor.Next(ctx) {
var tmp model.Service
err := cursor.Decode(&tmp)
if err != nil {
return err
}
request := &discovery.CreateServiceRequest{
Service: tmp.Service,
Tags: tmp.Tags,
}
putil.SetDomain(ctx, tmp.Domain)
putil.SetProject(ctx, tmp.Project)
err = sync.DoCreateOpts(ctx, datasource.ResourceService, request)
if err != nil {
log.Error("failed to create service task", err)
return err
}
}
return nil
}
func insertSyncAllKey(ctx context.Context) error {
_, err := dmongo.GetClient().GetDB().Collection(model.CollectionSync).InsertOne(ctx, bson.M{"key": SyncAllKey})
return err
}