/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package replicator

import (
	"context"
	"fmt"

	"github.com/go-chassis/foundation/gopool"
	"github.com/go-chassis/go-chassis/v2/server/restful"
	"google.golang.org/grpc"
	"google.golang.org/grpc/metadata"

	"github.com/apache/servicecomb-service-center/client"
	"github.com/apache/servicecomb-service-center/pkg/log"
	"github.com/apache/servicecomb-service-center/pkg/rpc"
	"github.com/apache/servicecomb-service-center/pkg/util"
	"github.com/apache/servicecomb-service-center/server/plugin/security/cipher"
	v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1"
	syncerclient "github.com/apache/servicecomb-service-center/syncer/client"
	"github.com/apache/servicecomb-service-center/syncer/config"
	"github.com/apache/servicecomb-service-center/syncer/service/replicator/resource"
)

const (
	schema      = "grpc"
	serviceName = "syncer"
)

const (
	reservedSize = 512 * 1024
	maxSize      = 10*1024*1024 - reservedSize
)

var (
	manager = NewManager(make(map[string]struct{}, 1000))
)

var (
	conn      *grpc.ClientConn
	peerToken = ""
)

func Work() error {
	err := InitSyncClient()
	if err != nil {
		return err
	}

	gopool.Go(func(ctx context.Context) {
		<-ctx.Done()

		Close()
	})

	resource.InitManager()
	return err
}

func InitSyncClient() error {
	peer := config.GetConfig().Sync.Peers[0]
	log.Info(fmt.Sprintf("peer is %v", peer))
	var err error
	conn, err = rpc.GetRoundRobinLbConn(&rpc.Config{
		Addrs:       peer.Endpoints,
		Scheme:      schema,
		ServiceName: serviceName,
		TLSConfig:   syncerclient.RPClientConfig(),
	})
	if err != nil {
		log.Error("get rpc client failed", err)
		return err
	}
	if !config.GetConfig().Sync.RbacEnabled {
		return nil
	}
	peerToken, err = cipher.Decrypt(peer.Token)
	if err != nil {
		log.Error("decrypt peer token failed, use original content", err)
		peerToken = peer.Token
	}
	return nil
}

func Close() {
	if conn == nil {
		return
	}

	err := conn.Close()
	if err != nil {
		log.Error("close conn failed", err)
	}
}

func Manager() Replicator {
	return manager
}

// Replicator define replicator manager, receive events from event manager
// and send events to remote syncer
type Replicator interface {
	Replicate(ctx context.Context, el *v1sync.EventList) (*v1sync.Results, error)
	Persist(ctx context.Context, el *v1sync.EventList) []*resource.Result
}

func NewManager(cache map[string]struct{}) Replicator {
	return &replicatorManager{
		cache: cache,
	}
}

type replicatorManager struct {
	cache map[string]struct{}
}

func (r *replicatorManager) Replicate(ctx context.Context, el *v1sync.EventList) (*v1sync.Results, error) {
	return r.replicate(ctx, el)
}

func pageEvents(source *v1sync.EventList, max int) []*v1sync.EventList {
	els := make([]*v1sync.EventList, 0, 5)

	size := 0
	el := &v1sync.EventList{
		Events: make([]*v1sync.Event, 0, 20),
	}

	for _, event := range source.Events {
		lv := len(event.Value)
		if size+lv < max {
			el.Events = append(el.Events, event)
			size += lv
			continue
		}

		log.Info(fmt.Sprintf("size is %d", size))
		els = append(els, el)

		el = &v1sync.EventList{
			Events: make([]*v1sync.Event, 0, 20),
		}
		el.Events = append(el.Events, event)
		size = lv
	}

	log.Info(fmt.Sprintf("size is %d", size))
	els = append(els, el)

	return els
}

func (r *replicatorManager) replicate(ctx context.Context, el *v1sync.EventList) (*v1sync.Results, error) {
	log.Info(fmt.Sprintf("start replicate events %d", len(el.Events)))

	set := client.NewSet(conn)

	els := pageEvents(el, maxSize)

	result := &v1sync.Results{
		Results: make(map[string]*v1sync.Result, len(el.Events)),
	}

	log.Info(fmt.Sprintf("page count %d to sync", len(els)))
	if config.GetConfig().Sync.RbacEnabled {
		ctx = metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{
			restful.HeaderAuth: "Bearer " + peerToken,
		}))
	}

	for _, in := range els {
		res, err := set.EventServiceClient.Sync(ctx, in)
		if err != nil {
			return nil, err
		}

		log.Info(fmt.Sprintf("replicate events success, count is %d", len(in.Events)))

		for k, v := range res.Results {
			log.Info(fmt.Sprintf("replicate event %s, %v", k, v))
			result.Results[k] = v
		}
	}

	log.Info(fmt.Sprintf("replicate events success %d", len(result.Results)))
	return result, nil
}

func (r *replicatorManager) Persist(ctx context.Context, el *v1sync.EventList) []*resource.Result {
	if el == nil || len(el.Events) == 0 {
		return []*resource.Result{}
	}

	results := make([]*resource.Result, 0, len(el.Events))
	for _, event := range el.Events {
		log.Info(fmt.Sprintf("start handle event %s", event.Flag()))

		r, result := resource.New(event)
		if result != nil {
			results = append(results, result.WithEventID(event.Id))
			continue
		}

		ctx = util.SetDomain(ctx, event.Opts[string(util.CtxDomain)])
		ctx = util.SetProject(ctx, event.Opts[string(util.CtxProject)])

		result = r.LoadCurrentResource(ctx)
		if result != nil {
			results = append(results, result.WithEventID(event.Id))
			continue
		}

		result = r.NeedOperate(ctx)
		if result != nil {
			results = append(results, result.WithEventID(event.Id))
			continue
		}

		result = r.Operate(ctx)
		results = append(results, result.WithEventID(event.Id))

		log.Info(fmt.Sprintf("operate resource, event: %s, result: %s", event.Flag(), result.Flag()))
	}

	for _, result := range results {
		log.Info(fmt.Sprintf("handle event result %s", result.Flag()))
	}

	return results
}
