/*
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements.  See the NOTICE file
 distributed with this work for additional information
 regarding copyright ownership.  The ASF licenses this file
 to you under the Apache License, Version 2.0 (the
 "License"); you may not use this file except in compliance
 with the License.  You may obtain a copy of the License at

     http://www.apache.org/licenses/LICENSE-2.0

 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
*/

package rmproxy

import (
	"fmt"
	"reflect"
	"strconv"
	"sync"

	"go.uber.org/zap"

	"github.com/apache/yunikorn-core/pkg/common"
	"github.com/apache/yunikorn-core/pkg/handler"
	"github.com/apache/yunikorn-core/pkg/log"
	"github.com/apache/yunikorn-core/pkg/metrics"
	"github.com/apache/yunikorn-core/pkg/plugins"
	"github.com/apache/yunikorn-core/pkg/rmproxy/rmevent"
	"github.com/apache/yunikorn-scheduler-interface/lib/go/api"
	siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)

// Gateway to talk to ResourceManager (behind grpc/API of scheduler-interface)
type RMProxy struct {
	EventHandlers handler.EventHandlers
	stop          chan struct{}

	// Internal fields
	pendingRMEvents chan interface{}

	rmIDToCallback map[string]api.ResourceManagerCallback

	sync.RWMutex
}

func (rmp *RMProxy) GetRMEventHandler() handler.EventHandler {
	return rmp
}

func enqueueAndCheckFull(queue chan interface{}, ev interface{}) {
	select {
	case queue <- ev:
		log.Log(log.RMProxy).Debug("enqueue event",
			zap.Stringer("eventType", reflect.TypeOf(ev)),
			zap.Any("event", ev),
			zap.Int("currentQueueSize", len(queue)))
	default:
		log.Log(log.RMProxy).DPanic("failed to enqueue event",
			zap.Stringer("event", reflect.TypeOf(ev)))
	}
}

func (rmp *RMProxy) HandleEvent(ev interface{}) {
	enqueueAndCheckFull(rmp.pendingRMEvents, ev)
}

func NewRMProxy() *RMProxy {
	rm := &RMProxy{
		rmIDToCallback:  make(map[string]api.ResourceManagerCallback),
		pendingRMEvents: make(chan interface{}, 1024*1024),
		stop:            make(chan struct{}),
	}
	return rm
}

func (rmp *RMProxy) StartService(handlers handler.EventHandlers) {
	rmp.EventHandlers = handlers

	go rmp.handleRMEvents()
}

func (rmp *RMProxy) handleUpdateResponseError(rmID string, err error) {
	log.Log(log.RMProxy).Error("failed to handle response",
		zap.String("rmID", rmID),
		zap.Error(err))
}

func (rmp *RMProxy) processAllocationUpdateEvent(event *rmevent.RMNewAllocationsEvent) {
	allocationsCount := len(event.Allocations)
	if allocationsCount != 0 {
		rmp.RLock()
		defer rmp.RUnlock()
		response := &si.AllocationResponse{
			New: event.Allocations,
		}
		rmp.triggerUpdateAllocation(event.RmID, response)
		metrics.GetSchedulerMetrics().AddAllocatedContainers(len(event.Allocations))
	}
	// Done, notify channel
	event.Channel <- &rmevent.Result{
		Succeeded: true,
		Reason:    "no. of allocations: " + strconv.Itoa(allocationsCount),
	}
}

func (rmp *RMProxy) processApplicationUpdateEvent(event *rmevent.RMApplicationUpdateEvent) {
	rmp.RLock()
	defer rmp.RUnlock()
	if len(event.RejectedApplications) == 0 && len(event.AcceptedApplications) == 0 && len(event.UpdatedApplications) == 0 {
		return
	}
	response := &si.ApplicationResponse{
		Rejected: event.RejectedApplications,
		Accepted: event.AcceptedApplications,
		Updated:  event.UpdatedApplications,
	}
	if callback := rmp.rmIDToCallback[event.RmID]; callback != nil {
		if err := callback.UpdateApplication(response); err != nil {
			rmp.handleUpdateResponseError(event.RmID, err)
		}
	} else {
		log.Log(log.RMProxy).DPanic("RM is not registered",
			zap.String("rmID", event.RmID))
	}

	// update app metrics
	if len(event.RejectedApplications) > 0 {
		metrics.GetSchedulerMetrics().AddTotalApplicationsRejected(len(event.RejectedApplications))
	}
	if len(event.AcceptedApplications) > 0 {
		metrics.GetSchedulerMetrics().AddTotalApplicationsAccepted(len(event.AcceptedApplications))
	}
}

func (rmp *RMProxy) processRMReleaseAllocationEvent(event *rmevent.RMReleaseAllocationEvent) {
	allocationsCount := len(event.ReleasedAllocations)
	if allocationsCount != 0 {
		rmp.RLock()
		defer rmp.RUnlock()
		response := &si.AllocationResponse{
			Released: event.ReleasedAllocations,
		}
		rmp.triggerUpdateAllocation(event.RmID, response)
		metrics.GetSchedulerMetrics().AddReleasedContainers(len(event.ReleasedAllocations))
	}

	// Done, notify channel
	event.Channel <- &rmevent.Result{
		Succeeded: true,
		Reason:    "no. of allocations: " + strconv.Itoa(allocationsCount),
	}
}

func (rmp *RMProxy) triggerUpdateAllocation(rmID string, response *si.AllocationResponse) {
	if callback := rmp.rmIDToCallback[rmID]; callback != nil {
		if err := callback.UpdateAllocation(response); err != nil {
			rmp.handleUpdateResponseError(rmID, err)
		}
	} else {
		log.Log(log.RMProxy).DPanic("RM is not registered",
			zap.String("rmID", rmID))
	}
}

func (rmp *RMProxy) processRMReleaseAllocationAskEvent(event *rmevent.RMReleaseAllocationAskEvent) {
	rmp.RLock()
	defer rmp.RUnlock()
	if len(event.ReleasedAllocationAsks) == 0 {
		return
	}
	response := &si.AllocationResponse{
		ReleasedAsks: event.ReleasedAllocationAsks,
	}
	rmp.triggerUpdateAllocation(event.RmID, response)
}

func (rmp *RMProxy) processRMRejectedAllocationAskEvent(event *rmevent.RMRejectedAllocationAskEvent) {
	rmp.RLock()
	defer rmp.RUnlock()
	if len(event.RejectedAllocationAsks) == 0 {
		return
	}
	response := &si.AllocationResponse{
		Rejected: event.RejectedAllocationAsks,
	}
	rmp.triggerUpdateAllocation(event.RmID, response)
	metrics.GetSchedulerMetrics().AddRejectedContainers(len(event.RejectedAllocationAsks))
}

func (rmp *RMProxy) processRMRejectedAllocationEvent(event *rmevent.RMRejectedAllocationEvent) {
	rmp.RLock()
	defer rmp.RUnlock()
	if len(event.RejectedAllocations) == 0 {
		return
	}
	response := &si.AllocationResponse{
		RejectedAllocations: event.RejectedAllocations,
	}
	rmp.triggerUpdateAllocation(event.RmID, response)
	metrics.GetSchedulerMetrics().AddRejectedContainers(len(event.RejectedAllocations))
}

func (rmp *RMProxy) processRMNodeUpdateEvent(event *rmevent.RMNodeUpdateEvent) {
	rmp.RLock()
	defer rmp.RUnlock()
	if len(event.RejectedNodes) == 0 && len(event.AcceptedNodes) == 0 {
		return
	}
	response := &si.NodeResponse{
		Rejected: event.RejectedNodes,
		Accepted: event.AcceptedNodes,
	}

	if callback := rmp.rmIDToCallback[event.RmID]; callback != nil {
		if err := callback.UpdateNode(response); err != nil {
			rmp.handleUpdateResponseError(event.RmID, err)
		}
	} else {
		log.Log(log.RMProxy).DPanic("RM is not registered",
			zap.String("rmID", event.RmID))
	}
}

func (rmp *RMProxy) handleRMEvents() {
	for {
		select {
		case ev := <-rmp.pendingRMEvents:
			switch v := ev.(type) {
			case *rmevent.RMNewAllocationsEvent:
				rmp.processAllocationUpdateEvent(v)
			case *rmevent.RMApplicationUpdateEvent:
				rmp.processApplicationUpdateEvent(v)
			case *rmevent.RMReleaseAllocationEvent:
				rmp.processRMReleaseAllocationEvent(v)
			case *rmevent.RMRejectedAllocationAskEvent:
				rmp.processRMRejectedAllocationAskEvent(v)
			case *rmevent.RMRejectedAllocationEvent:
				rmp.processRMRejectedAllocationEvent(v)
			case *rmevent.RMNodeUpdateEvent:
				rmp.processRMNodeUpdateEvent(v)
			case *rmevent.RMReleaseAllocationAskEvent:
				rmp.processRMReleaseAllocationAskEvent(v)
			default:
				panic(fmt.Sprintf("%s is not an acceptable type for RM event.", reflect.TypeOf(v).String()))
			}
		case <-rmp.stop:
			return
		}
	}
}

func (rmp *RMProxy) RegisterResourceManager(request *si.RegisterResourceManagerRequest, callback api.ResourceManagerCallback) (*si.RegisterResourceManagerResponse, error) {
	rmp.Lock()
	defer rmp.Unlock()
	c := make(chan *rmevent.Result)

	// If this is a re-register we need to clean up first
	if rmp.rmIDToCallback[request.RmID] != nil {
		go func() {
			rmp.EventHandlers.SchedulerEventHandler.HandleEvent(
				&rmevent.RMPartitionsRemoveEvent{
					RmID:    request.RmID,
					Channel: c,
				})
		}()

		result := <-c
		close(c)
		if !result.Succeeded {
			return nil, fmt.Errorf("registration of RM failed: %v", result.Reason)
		}
	}

	c = make(chan *rmevent.Result)

	// Add new RM.
	go func() {
		rmp.EventHandlers.SchedulerEventHandler.HandleEvent(
			&rmevent.RMRegistrationEvent{
				Registration: request,
				Channel:      c,
			})
	}()

	// Wait from channel
	result := <-c
	if result.Succeeded {
		rmp.rmIDToCallback[request.RmID] = callback

		// RM callback can optionally implement one or more scheduler plugin interfaces,
		// register scheduler plugin if the callback implements any plugin interface
		plugins.RegisterSchedulerPlugin(callback)

		return &si.RegisterResourceManagerResponse{}, nil
	}
	return nil, fmt.Errorf("registration of RM failed: %v", result.Reason)
}

func (rmp *RMProxy) GetResourceManagerCallback(rmID string) api.ResourceManagerCallback {
	rmp.RLock()
	defer rmp.RUnlock()

	return rmp.rmIDToCallback[rmID]
}

func (rmp *RMProxy) UpdateAllocation(request *si.AllocationRequest) error {
	if rmp.GetResourceManagerCallback(request.RmID) == nil {
		return fmt.Errorf("received AllocationRequest, but RmID=\"%s\" not registered", request.RmID)
	}
	// Update allocations
	for _, alloc := range request.Allocations {
		alloc.PartitionName = common.GetNormalizedPartitionName(alloc.PartitionName, request.RmID)
	}

	// Update asks
	for _, ask := range request.Asks {
		ask.PartitionName = common.GetNormalizedPartitionName(ask.PartitionName, request.RmID)
	}

	// Update releases
	if request.Releases != nil {
		for _, rel := range request.Releases.AllocationsToRelease {
			rel.PartitionName = common.GetNormalizedPartitionName(rel.PartitionName, request.RmID)
		}
		for _, rel := range request.Releases.AllocationAsksToRelease {
			rel.PartitionName = common.GetNormalizedPartitionName(rel.PartitionName, request.RmID)
		}
	}
	rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateAllocationEvent{Request: request})
	return nil
}

func (rmp *RMProxy) UpdateApplication(request *si.ApplicationRequest) error {
	if rmp.GetResourceManagerCallback(request.RmID) == nil {
		return fmt.Errorf("received ApplicationRequest, but RmID=\"%s\" not registered", request.RmID)
	}

	// Update New apps
	for _, app := range request.New {
		app.PartitionName = common.GetNormalizedPartitionName(app.PartitionName, request.RmID)
	}

	// Update Remove apps
	for _, app := range request.Remove {
		app.PartitionName = common.GetNormalizedPartitionName(app.PartitionName, request.RmID)
	}

	rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateApplicationEvent{Request: request})
	return nil
}

func (rmp *RMProxy) UpdateNode(request *si.NodeRequest) error {
	if rmp.GetResourceManagerCallback(request.RmID) == nil {
		return fmt.Errorf("received NodeRequest, but RmID=\"%s\" not registered", request.RmID)
	}

	for _, node := range request.Nodes {
		if len(node.GetAttributes()) == 0 {
			node.Attributes = map[string]string{}
		}
		partition := node.Attributes[siCommon.NodePartition]
		node.Attributes[siCommon.NodePartition] = common.GetNormalizedPartitionName(partition, request.RmID)
	}

	rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateNodeEvent{Request: request})
	return nil
}

// Triggers scheduler to reload configuration and apply the changes on-the-fly to the scheduler itself.
func (rmp *RMProxy) UpdateConfiguration(request *si.UpdateConfigurationRequest) error {
	rmp.RLock()
	defer rmp.RUnlock()

	c := make(chan *rmevent.Result)
	go func() {
		rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMConfigUpdateEvent{
			RmID:        request.RmID,
			PolicyGroup: request.PolicyGroup,
			Config:      request.Config,
			ExtraConfig: request.ExtraConfig,
			Channel:     c,
		})
	}()

	// Wait from channel
	result := <-c
	if !result.Succeeded {
		return fmt.Errorf("update of configuration failed: %v", result.Reason)
	}
	return nil
}

func (rmp *RMProxy) Stop() {
	log.Log(log.RMProxy).Info("Stopping RMProxy")
	close(rmp.stop)
}
