blob: f05d8e8ae34a52568fc4fd2eff170bfe9f3ce461 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rmproxy
import (
"fmt"
"reflect"
"sync"
"go.uber.org/zap"
"github.com/apache/incubator-yunikorn-core/pkg/api"
"github.com/apache/incubator-yunikorn-core/pkg/common"
"github.com/apache/incubator-yunikorn-core/pkg/common/configs"
"github.com/apache/incubator-yunikorn-core/pkg/handler"
"github.com/apache/incubator-yunikorn-core/pkg/log"
"github.com/apache/incubator-yunikorn-core/pkg/metrics"
"github.com/apache/incubator-yunikorn-core/pkg/plugins"
"github.com/apache/incubator-yunikorn-core/pkg/rmproxy/rmevent"
siCommon "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si"
)
// Gateway to talk to ResourceManager (behind grpc/API of scheduler-interface)
type RMProxy struct {
EventHandlers handler.EventHandlers
// Internal fields
pendingRMEvents chan interface{}
rmIDToCallback map[string]api.ResourceManagerCallback
// config version is tracked per RM,
// it is used to determine if configs need to be reloaded
rmIDToConfigWatcher map[string]*configs.ConfigWatcher
sync.RWMutex
}
func (rmp *RMProxy) GetRMEventHandler() handler.EventHandler {
return rmp
}
func enqueueAndCheckFull(queue chan interface{}, ev interface{}) {
select {
case queue <- ev:
log.Logger().Debug("enqueue event",
zap.Any("event", ev),
zap.Int("currentQueueSize", len(queue)))
default:
log.Logger().Panic("failed to enqueue event",
zap.String("event", reflect.TypeOf(ev).String()))
}
}
func (rmp *RMProxy) HandleEvent(ev interface{}) {
enqueueAndCheckFull(rmp.pendingRMEvents, ev)
}
func NewRMProxy() *RMProxy {
rm := &RMProxy{
rmIDToCallback: make(map[string]api.ResourceManagerCallback),
rmIDToConfigWatcher: make(map[string]*configs.ConfigWatcher),
pendingRMEvents: make(chan interface{}, 1024*1024),
}
return rm
}
func (rmp *RMProxy) StartService(handlers handler.EventHandlers) {
rmp.EventHandlers = handlers
go rmp.handleRMEvents()
}
func (rmp *RMProxy) handleRMRecvUpdateResponseError(rmID string, err error) {
log.Logger().Error("failed to handle response",
zap.String("rmID", rmID),
zap.Error(err))
}
func (rmp *RMProxy) processUpdateResponse(rmID string, response *si.UpdateResponse) {
rmp.RLock()
defer rmp.RUnlock()
if callback := rmp.rmIDToCallback[rmID]; callback != nil {
if err := callback.RecvUpdateResponse(response); err != nil {
rmp.handleRMRecvUpdateResponseError(rmID, err)
}
} else {
log.Logger().DPanic("RM is not registered",
zap.String("rmID", rmID))
}
}
func (rmp *RMProxy) processAllocationUpdateEvent(event *rmevent.RMNewAllocationsEvent) {
if len(event.Allocations) == 0 {
return
}
response := &si.UpdateResponse{
NewAllocations: event.Allocations,
}
rmp.processUpdateResponse(event.RmID, response)
metrics.GetSchedulerMetrics().AddAllocatedContainers(len(event.Allocations))
}
func (rmp *RMProxy) processApplicationUpdateEvent(event *rmevent.RMApplicationUpdateEvent) {
if len(event.RejectedApplications) == 0 && len(event.AcceptedApplications) == 0 && len(event.UpdatedApplications) == 0 {
return
}
response := &si.UpdateResponse{
RejectedApplications: event.RejectedApplications,
AcceptedApplications: event.AcceptedApplications,
UpdatedApplications: event.UpdatedApplications,
}
rmp.processUpdateResponse(event.RmID, response)
// update app metrics
if len(event.RejectedApplications) > 0 {
metrics.GetSchedulerMetrics().AddTotalApplicationsRejected(len(event.RejectedApplications))
}
if len(event.AcceptedApplications) > 0 {
metrics.GetSchedulerMetrics().AddTotalApplicationsAdded(len(event.AcceptedApplications))
}
}
func (rmp *RMProxy) processRMReleaseAllocationEvent(event *rmevent.RMReleaseAllocationEvent) {
if len(event.ReleasedAllocations) == 0 {
return
}
response := &si.UpdateResponse{
ReleasedAllocations: event.ReleasedAllocations,
}
rmp.processUpdateResponse(event.RmID, response)
metrics.GetSchedulerMetrics().AddReleasedContainers(len(event.ReleasedAllocations))
}
func (rmp *RMProxy) processRMReleaseAllocationAskEvent(event *rmevent.RMReleaseAllocationAskEvent) {
if len(event.ReleasedAllocationAsks) == 0 {
return
}
response := &si.UpdateResponse{
ReleasedAllocationAsks: event.ReleasedAllocationAsks,
}
rmp.processUpdateResponse(event.RmID, response)
}
func (rmp *RMProxy) processUpdatePartitionConfigsEvent(event *rmevent.RMRejectedAllocationAskEvent) {
if len(event.RejectedAllocationAsks) == 0 {
return
}
response := &si.UpdateResponse{
RejectedAllocations: event.RejectedAllocationAsks,
}
rmp.processUpdateResponse(event.RmID, response)
metrics.GetSchedulerMetrics().AddRejectedContainers(len(event.RejectedAllocationAsks))
}
func (rmp *RMProxy) processRMNodeUpdateEvent(event *rmevent.RMNodeUpdateEvent) {
if len(event.RejectedNodes) == 0 && len(event.AcceptedNodes) == 0 {
return
}
response := &si.UpdateResponse{
RejectedNodes: event.RejectedNodes,
AcceptedNodes: event.AcceptedNodes,
}
rmp.processUpdateResponse(event.RmID, response)
}
func (rmp *RMProxy) handleRMEvents() {
for {
ev := <-rmp.pendingRMEvents
switch v := ev.(type) {
case *rmevent.RMNewAllocationsEvent:
rmp.processAllocationUpdateEvent(v)
case *rmevent.RMApplicationUpdateEvent:
rmp.processApplicationUpdateEvent(v)
case *rmevent.RMReleaseAllocationEvent:
rmp.processRMReleaseAllocationEvent(v)
case *rmevent.RMRejectedAllocationAskEvent:
rmp.processUpdatePartitionConfigsEvent(v)
case *rmevent.RMNodeUpdateEvent:
rmp.processRMNodeUpdateEvent(v)
case *rmevent.RMReleaseAllocationAskEvent:
rmp.processRMReleaseAllocationAskEvent(v)
default:
panic(fmt.Sprintf("%s is not an acceptable type for RM event.", reflect.TypeOf(v).String()))
}
}
}
func (rmp *RMProxy) RegisterResourceManager(request *si.RegisterResourceManagerRequest, callback api.ResourceManagerCallback) (*si.RegisterResourceManagerResponse, error) {
rmp.Lock()
defer rmp.Unlock()
c := make(chan *rmevent.Result)
// If this is a re-register we need to clean up first
if rmp.rmIDToCallback[request.RmID] != nil {
go func() {
rmp.EventHandlers.SchedulerEventHandler.HandleEvent(
&rmevent.RMPartitionsRemoveEvent{
RmID: request.RmID,
Channel: c,
})
}()
result := <-c
close(c)
if !result.Succeeded {
return nil, fmt.Errorf("registration of RM failed: %v", result.Reason)
}
}
c = make(chan *rmevent.Result)
// Add new RM.
go func() {
rmp.EventHandlers.SchedulerEventHandler.HandleEvent(
&rmevent.RMRegistrationEvent{
Registration: request,
Channel: c,
})
}()
// Wait from channel
result := <-c
if result.Succeeded {
// create a config watcher for this RM
// config watcher will only be started when a reload is triggered
// it is configured with a expiration time, and will be auto exit once that reaches
configWatcher := configs.CreateConfigWatcher(request.RmID, request.PolicyGroup, configs.DefaultConfigWatcherDuration)
configWatcher.RegisterCallback(&configurationReloader{
rmID: request.RmID,
rmProxy: rmp,
})
rmp.rmIDToConfigWatcher[request.RmID] = configWatcher
rmp.rmIDToCallback[request.RmID] = callback
// RM callback can optionally implement one or more scheduler plugin interfaces,
// register scheduler plugin if the callback implements any plugin interface
plugins.RegisterSchedulerPlugin(callback)
return &si.RegisterResourceManagerResponse{}, nil
}
return nil, fmt.Errorf("registration of RM failed: %v", result.Reason)
}
func (rmp *RMProxy) GetResourceManagerCallback(rmID string) api.ResourceManagerCallback {
rmp.RLock()
defer rmp.RUnlock()
return rmp.rmIDToCallback[rmID]
}
// Do normalize.
func normalizeUpdateRequestByRMId(request *si.UpdateRequest) {
// Update asks
if len(request.Asks) > 0 {
for _, ask := range request.Asks {
ask.PartitionName = common.GetNormalizedPartitionName(ask.PartitionName, request.RmID)
}
}
// Update Schedulable Nodes
if len(request.NewSchedulableNodes) > 0 {
for _, node := range request.NewSchedulableNodes {
partition := node.Attributes[siCommon.NodePartition]
node.Attributes[siCommon.NodePartition] = common.GetNormalizedPartitionName(partition, request.RmID)
}
}
// Update New apps
if len(request.NewApplications) > 0 {
for _, app := range request.NewApplications {
app.PartitionName = common.GetNormalizedPartitionName(app.PartitionName, request.RmID)
}
}
// Update Updated nodes
if len(request.UpdatedNodes) > 0 {
for _, node := range request.UpdatedNodes {
partition := node.Attributes[siCommon.NodePartition]
node.Attributes[siCommon.NodePartition] = common.GetNormalizedPartitionName(partition, request.RmID)
}
}
// Update Remove apps
if len(request.RemoveApplications) > 0 {
for _, app := range request.RemoveApplications {
app.PartitionName = common.GetNormalizedPartitionName(app.PartitionName, request.RmID)
}
}
// Update releases
if request.Releases != nil {
if len(request.Releases.AllocationsToRelease) > 0 {
for _, rel := range request.Releases.AllocationsToRelease {
rel.PartitionName = common.GetNormalizedPartitionName(rel.PartitionName, request.RmID)
}
}
if len(request.Releases.AllocationAsksToRelease) > 0 {
for _, rel := range request.Releases.AllocationAsksToRelease {
rel.PartitionName = common.GetNormalizedPartitionName(rel.PartitionName, request.RmID)
}
}
}
}
func (rmp *RMProxy) Update(request *si.UpdateRequest) error {
if rmp.GetResourceManagerCallback(request.RmID) == nil {
return fmt.Errorf("received UpdateRequest, but RmID=\"%s\" not registered", request.RmID)
}
go func() {
normalizeUpdateRequestByRMId(request)
rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateRequestEvent{Request: request})
}()
return nil
}
// Triggers scheduler to reload configuration and apply the changes on-the-fly to the scheduler itself.
func (rmp *RMProxy) ReloadConfiguration(rmID string) error {
rmp.RLock()
defer rmp.RUnlock()
cw, ok := rmp.rmIDToConfigWatcher[rmID]
if !ok {
// if configWatcher is not found for this RM
return fmt.Errorf("failed to reload configuration, because RM %s is unknown to the scheduler", rmID)
}
// ensure configWatcher is running
// configWatcher is only triggered to run when the reload is called,
// it might be stopped when reload is done or expires, so it needs to
// be re-triggered when there is new reload call coming. This is a
// noop if the config watcher is already running.
cw.Run()
return nil
}
// actual configuration reloader
type configurationReloader struct {
rmID string
rmProxy *RMProxy
}
func (cr configurationReloader) DoReloadConfiguration() error {
c := make(chan *rmevent.Result)
cr.rmProxy.EventHandlers.SchedulerEventHandler.HandleEvent(
&rmevent.RMConfigUpdateEvent{
RmID: cr.rmID,
Channel: c,
})
result := <-c
if !result.Succeeded {
return fmt.Errorf("failed to update configuration for RM %s, result: %v", cr.rmID, result)
}
return nil
}