blob: 9ee89e6bbeaa8017e0f0da5dd702f1429de97be0 [file] [log] [blame]
package replicator
import (
"context"
"fmt"
"github.com/apache/servicecomb-service-center/client"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/rpc"
"github.com/apache/servicecomb-service-center/pkg/util"
v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1"
syncerclient "github.com/apache/servicecomb-service-center/syncer/client"
"github.com/apache/servicecomb-service-center/syncer/config"
"github.com/apache/servicecomb-service-center/syncer/service/replicator/resource"
"github.com/go-chassis/foundation/gopool"
"google.golang.org/grpc"
)
const (
schema = "grpc"
serviceName = "syncer"
)
var (
manager = NewManager(make(map[string]struct{}, 1000))
)
var (
conn *grpc.ClientConn
)
func Work() error {
err := InitSyncClient()
if err != nil {
return err
}
gopool.Go(func(ctx context.Context) {
<-ctx.Done()
Close()
})
resource.InitManager()
return err
}
func InitSyncClient() error {
cfg := config.GetConfig()
peer := cfg.Sync.Peers[0]
log.Info(fmt.Sprintf("peer is %v", peer))
var err error
conn, err = rpc.GetRoundRobinLbConn(&rpc.Config{
Addrs: peer.Endpoints,
Scheme: schema,
ServiceName: serviceName,
TLSConfig: syncerclient.RPClientConfig(),
})
return err
}
func Close() {
if conn == nil {
return
}
err := conn.Close()
if err != nil {
log.Error("close conn failed", err)
}
}
func Manager() Replicator {
return manager
}
// Replicator define replicator manager, receive events from event manager
// and send events to remote syncer
type Replicator interface {
Replicate(ctx context.Context, el *v1sync.EventList) (*v1sync.Results, error)
Persist(ctx context.Context, el *v1sync.EventList) []*resource.Result
}
func NewManager(cache map[string]struct{}) Replicator {
return &replicatorManager{
cache: cache,
}
}
type replicatorManager struct {
cache map[string]struct{}
}
func (r *replicatorManager) Replicate(ctx context.Context, el *v1sync.EventList) (*v1sync.Results, error) {
return r.replicate(ctx, el)
}
func (r *replicatorManager) replicate(ctx context.Context, el *v1sync.EventList) (*v1sync.Results, error) {
log.Info(fmt.Sprintf("start replicate events %d", len(el.Events)))
set := client.NewSet(conn)
result, err := set.EventServiceClient.Sync(ctx, el)
if err != nil {
return nil, err
}
log.Info(fmt.Sprintf("replicate events success %d", len(result.Results)))
return result, nil
}
func (r *replicatorManager) Persist(ctx context.Context, el *v1sync.EventList) []*resource.Result {
if el == nil || len(el.Events) == 0 {
return []*resource.Result{}
}
results := make([]*resource.Result, 0, len(el.Events))
for _, event := range el.Events {
log.Info(fmt.Sprintf("start handle event %s", event.Flag()))
r, result := resource.New(event)
if result != nil {
results = append(results, result.WithEventID(event.Id))
continue
}
ctx = util.SetDomain(ctx, event.Opts[string(util.CtxDomain)])
ctx = util.SetProject(ctx, event.Opts[string(util.CtxProject)])
result = r.LoadCurrentResource(ctx)
if result != nil {
results = append(results, result.WithEventID(event.Id))
continue
}
result = r.NeedOperate(ctx)
if result != nil {
results = append(results, result.WithEventID(event.Id))
continue
}
result = r.Operate(ctx)
results = append(results, result.WithEventID(event.Id))
log.Info(fmt.Sprintf("operate resource %s, %s", event.Flag(), result.Flag()))
}
for _, result := range results {
log.Info(fmt.Sprintf("handle event result %s", result.Flag()))
}
return results
}