blob: ecf06bea2875ddf60a679cb205ce39444047f411 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rmproxy
import (
"fmt"
"reflect"
"strconv"
"sync"
"go.uber.org/zap"
"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/handler"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/metrics"
"github.com/apache/yunikorn-core/pkg/plugins"
"github.com/apache/yunikorn-core/pkg/rmproxy/rmevent"
"github.com/apache/yunikorn-scheduler-interface/lib/go/api"
siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
// Gateway to talk to ResourceManager (behind grpc/API of scheduler-interface)
type RMProxy struct {
EventHandlers handler.EventHandlers
stop chan struct{}
// Internal fields
pendingRMEvents chan interface{}
rmIDToCallback map[string]api.ResourceManagerCallback
sync.RWMutex
}
func (rmp *RMProxy) GetRMEventHandler() handler.EventHandler {
return rmp
}
func enqueueAndCheckFull(queue chan interface{}, ev interface{}) {
select {
case queue <- ev:
log.Log(log.RMProxy).Debug("enqueue event",
zap.Stringer("eventType", reflect.TypeOf(ev)),
zap.Any("event", ev),
zap.Int("currentQueueSize", len(queue)))
default:
log.Log(log.RMProxy).DPanic("failed to enqueue event",
zap.Stringer("event", reflect.TypeOf(ev)))
}
}
func (rmp *RMProxy) HandleEvent(ev interface{}) {
enqueueAndCheckFull(rmp.pendingRMEvents, ev)
}
func NewRMProxy() *RMProxy {
rm := &RMProxy{
rmIDToCallback: make(map[string]api.ResourceManagerCallback),
pendingRMEvents: make(chan interface{}, 1024*1024),
stop: make(chan struct{}),
}
return rm
}
func (rmp *RMProxy) StartService(handlers handler.EventHandlers) {
rmp.EventHandlers = handlers
go rmp.handleRMEvents()
}
func (rmp *RMProxy) handleUpdateResponseError(rmID string, err error) {
log.Log(log.RMProxy).Error("failed to handle response",
zap.String("rmID", rmID),
zap.Error(err))
}
func (rmp *RMProxy) processAllocationUpdateEvent(event *rmevent.RMNewAllocationsEvent) {
allocationsCount := len(event.Allocations)
if allocationsCount != 0 {
rmp.RLock()
defer rmp.RUnlock()
response := &si.AllocationResponse{
New: event.Allocations,
}
rmp.triggerUpdateAllocation(event.RmID, response)
metrics.GetSchedulerMetrics().AddAllocatedContainers(len(event.Allocations))
}
// Done, notify channel
event.Channel <- &rmevent.Result{
Succeeded: true,
Reason: "no. of allocations: " + strconv.Itoa(allocationsCount),
}
}
func (rmp *RMProxy) processApplicationUpdateEvent(event *rmevent.RMApplicationUpdateEvent) {
rmp.RLock()
defer rmp.RUnlock()
if len(event.RejectedApplications) == 0 && len(event.AcceptedApplications) == 0 && len(event.UpdatedApplications) == 0 {
return
}
response := &si.ApplicationResponse{
Rejected: event.RejectedApplications,
Accepted: event.AcceptedApplications,
Updated: event.UpdatedApplications,
}
if callback := rmp.rmIDToCallback[event.RmID]; callback != nil {
if err := callback.UpdateApplication(response); err != nil {
rmp.handleUpdateResponseError(event.RmID, err)
}
} else {
log.Log(log.RMProxy).DPanic("RM is not registered",
zap.String("rmID", event.RmID))
}
// update app metrics
if len(event.RejectedApplications) > 0 {
metrics.GetSchedulerMetrics().AddTotalApplicationsRejected(len(event.RejectedApplications))
}
if len(event.AcceptedApplications) > 0 {
metrics.GetSchedulerMetrics().AddTotalApplicationsAccepted(len(event.AcceptedApplications))
}
}
func (rmp *RMProxy) processRMReleaseAllocationEvent(event *rmevent.RMReleaseAllocationEvent) {
allocationsCount := len(event.ReleasedAllocations)
if allocationsCount != 0 {
rmp.RLock()
defer rmp.RUnlock()
response := &si.AllocationResponse{
Released: event.ReleasedAllocations,
}
rmp.triggerUpdateAllocation(event.RmID, response)
metrics.GetSchedulerMetrics().AddReleasedContainers(len(event.ReleasedAllocations))
}
// Done, notify channel
event.Channel <- &rmevent.Result{
Succeeded: true,
Reason: "no. of allocations: " + strconv.Itoa(allocationsCount),
}
}
func (rmp *RMProxy) triggerUpdateAllocation(rmID string, response *si.AllocationResponse) {
if callback := rmp.rmIDToCallback[rmID]; callback != nil {
if err := callback.UpdateAllocation(response); err != nil {
rmp.handleUpdateResponseError(rmID, err)
}
} else {
log.Log(log.RMProxy).DPanic("RM is not registered",
zap.String("rmID", rmID))
}
}
func (rmp *RMProxy) processRMReleaseAllocationAskEvent(event *rmevent.RMReleaseAllocationAskEvent) {
rmp.RLock()
defer rmp.RUnlock()
if len(event.ReleasedAllocationAsks) == 0 {
return
}
response := &si.AllocationResponse{
ReleasedAsks: event.ReleasedAllocationAsks,
}
rmp.triggerUpdateAllocation(event.RmID, response)
}
func (rmp *RMProxy) processRMRejectedAllocationAskEvent(event *rmevent.RMRejectedAllocationAskEvent) {
rmp.RLock()
defer rmp.RUnlock()
if len(event.RejectedAllocationAsks) == 0 {
return
}
response := &si.AllocationResponse{
Rejected: event.RejectedAllocationAsks,
}
rmp.triggerUpdateAllocation(event.RmID, response)
metrics.GetSchedulerMetrics().AddRejectedContainers(len(event.RejectedAllocationAsks))
}
func (rmp *RMProxy) processRMRejectedAllocationEvent(event *rmevent.RMRejectedAllocationEvent) {
rmp.RLock()
defer rmp.RUnlock()
if len(event.RejectedAllocations) == 0 {
return
}
response := &si.AllocationResponse{
RejectedAllocations: event.RejectedAllocations,
}
rmp.triggerUpdateAllocation(event.RmID, response)
metrics.GetSchedulerMetrics().AddRejectedContainers(len(event.RejectedAllocations))
}
func (rmp *RMProxy) processRMNodeUpdateEvent(event *rmevent.RMNodeUpdateEvent) {
rmp.RLock()
defer rmp.RUnlock()
if len(event.RejectedNodes) == 0 && len(event.AcceptedNodes) == 0 {
return
}
response := &si.NodeResponse{
Rejected: event.RejectedNodes,
Accepted: event.AcceptedNodes,
}
if callback := rmp.rmIDToCallback[event.RmID]; callback != nil {
if err := callback.UpdateNode(response); err != nil {
rmp.handleUpdateResponseError(event.RmID, err)
}
} else {
log.Log(log.RMProxy).DPanic("RM is not registered",
zap.String("rmID", event.RmID))
}
}
func (rmp *RMProxy) handleRMEvents() {
for {
select {
case ev := <-rmp.pendingRMEvents:
switch v := ev.(type) {
case *rmevent.RMNewAllocationsEvent:
rmp.processAllocationUpdateEvent(v)
case *rmevent.RMApplicationUpdateEvent:
rmp.processApplicationUpdateEvent(v)
case *rmevent.RMReleaseAllocationEvent:
rmp.processRMReleaseAllocationEvent(v)
case *rmevent.RMRejectedAllocationAskEvent:
rmp.processRMRejectedAllocationAskEvent(v)
case *rmevent.RMRejectedAllocationEvent:
rmp.processRMRejectedAllocationEvent(v)
case *rmevent.RMNodeUpdateEvent:
rmp.processRMNodeUpdateEvent(v)
case *rmevent.RMReleaseAllocationAskEvent:
rmp.processRMReleaseAllocationAskEvent(v)
default:
panic(fmt.Sprintf("%s is not an acceptable type for RM event.", reflect.TypeOf(v).String()))
}
case <-rmp.stop:
return
}
}
}
func (rmp *RMProxy) RegisterResourceManager(request *si.RegisterResourceManagerRequest, callback api.ResourceManagerCallback) (*si.RegisterResourceManagerResponse, error) {
rmp.Lock()
defer rmp.Unlock()
c := make(chan *rmevent.Result)
// If this is a re-register we need to clean up first
if rmp.rmIDToCallback[request.RmID] != nil {
go func() {
rmp.EventHandlers.SchedulerEventHandler.HandleEvent(
&rmevent.RMPartitionsRemoveEvent{
RmID: request.RmID,
Channel: c,
})
}()
result := <-c
close(c)
if !result.Succeeded {
return nil, fmt.Errorf("registration of RM failed: %v", result.Reason)
}
}
c = make(chan *rmevent.Result)
// Add new RM.
go func() {
rmp.EventHandlers.SchedulerEventHandler.HandleEvent(
&rmevent.RMRegistrationEvent{
Registration: request,
Channel: c,
})
}()
// Wait from channel
result := <-c
if result.Succeeded {
rmp.rmIDToCallback[request.RmID] = callback
// RM callback can optionally implement one or more scheduler plugin interfaces,
// register scheduler plugin if the callback implements any plugin interface
plugins.RegisterSchedulerPlugin(callback)
return &si.RegisterResourceManagerResponse{}, nil
}
return nil, fmt.Errorf("registration of RM failed: %v", result.Reason)
}
func (rmp *RMProxy) GetResourceManagerCallback(rmID string) api.ResourceManagerCallback {
rmp.RLock()
defer rmp.RUnlock()
return rmp.rmIDToCallback[rmID]
}
func (rmp *RMProxy) UpdateAllocation(request *si.AllocationRequest) error {
if rmp.GetResourceManagerCallback(request.RmID) == nil {
return fmt.Errorf("received AllocationRequest, but RmID=\"%s\" not registered", request.RmID)
}
// Update allocations
for _, alloc := range request.Allocations {
alloc.PartitionName = common.GetNormalizedPartitionName(alloc.PartitionName, request.RmID)
}
// Update asks
for _, ask := range request.Asks {
ask.PartitionName = common.GetNormalizedPartitionName(ask.PartitionName, request.RmID)
}
// Update releases
if request.Releases != nil {
for _, rel := range request.Releases.AllocationsToRelease {
rel.PartitionName = common.GetNormalizedPartitionName(rel.PartitionName, request.RmID)
}
for _, rel := range request.Releases.AllocationAsksToRelease {
rel.PartitionName = common.GetNormalizedPartitionName(rel.PartitionName, request.RmID)
}
}
rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateAllocationEvent{Request: request})
return nil
}
func (rmp *RMProxy) UpdateApplication(request *si.ApplicationRequest) error {
if rmp.GetResourceManagerCallback(request.RmID) == nil {
return fmt.Errorf("received ApplicationRequest, but RmID=\"%s\" not registered", request.RmID)
}
// Update New apps
for _, app := range request.New {
app.PartitionName = common.GetNormalizedPartitionName(app.PartitionName, request.RmID)
}
// Update Remove apps
for _, app := range request.Remove {
app.PartitionName = common.GetNormalizedPartitionName(app.PartitionName, request.RmID)
}
rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateApplicationEvent{Request: request})
return nil
}
func (rmp *RMProxy) UpdateNode(request *si.NodeRequest) error {
if rmp.GetResourceManagerCallback(request.RmID) == nil {
return fmt.Errorf("received NodeRequest, but RmID=\"%s\" not registered", request.RmID)
}
for _, node := range request.Nodes {
if len(node.GetAttributes()) == 0 {
node.Attributes = map[string]string{}
}
partition := node.Attributes[siCommon.NodePartition]
node.Attributes[siCommon.NodePartition] = common.GetNormalizedPartitionName(partition, request.RmID)
}
rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateNodeEvent{Request: request})
return nil
}
// Triggers scheduler to reload configuration and apply the changes on-the-fly to the scheduler itself.
func (rmp *RMProxy) UpdateConfiguration(request *si.UpdateConfigurationRequest) error {
rmp.RLock()
defer rmp.RUnlock()
c := make(chan *rmevent.Result)
go func() {
rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMConfigUpdateEvent{
RmID: request.RmID,
PolicyGroup: request.PolicyGroup,
Config: request.Config,
ExtraConfig: request.ExtraConfig,
Channel: c,
})
}()
// Wait from channel
result := <-c
if !result.Succeeded {
return fmt.Errorf("update of configuration failed: %v", result.Reason)
}
return nil
}
func (rmp *RMProxy) Stop() {
log.Log(log.RMProxy).Info("Stopping RMProxy")
close(rmp.stop)
}