blob: d47ef59d5af9090691e4d750a248ac89bfa4c087 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package examples
import (
"github.com/apache/yunikorn-core/pkg/entrypoint"
"github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/mock"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
type exampleRMCallback struct {
mock.ResourceManagerCallback
acceptedApplications map[string]bool
rejectedApplications map[string]bool
acceptedNodes map[string]bool
rejectedNodes map[string]bool
nodeAllocations map[string][]*si.Allocation
Allocations map[string]*si.Allocation
locking.RWMutex
}
func (m *exampleRMCallback) UpdateAllocation(response *si.AllocationResponse) error {
m.Lock()
defer m.Unlock()
for _, alloc := range response.New {
m.Allocations[alloc.AllocationKey] = alloc
if val, ok := m.nodeAllocations[alloc.NodeID]; ok {
val = append(val, alloc)
m.nodeAllocations[alloc.NodeID] = val
} else {
nodeAllocations := make([]*si.Allocation, 0)
nodeAllocations = append(nodeAllocations, alloc)
m.nodeAllocations[alloc.NodeID] = nodeAllocations
}
}
for _, alloc := range response.Released {
delete(m.Allocations, alloc.AllocationKey)
}
return nil
}
func (m *exampleRMCallback) UpdateApplication(response *si.ApplicationResponse) error {
m.Lock()
defer m.Unlock()
for _, app := range response.Accepted {
m.acceptedApplications[app.ApplicationID] = true
delete(m.rejectedApplications, app.ApplicationID)
}
for _, app := range response.Rejected {
m.rejectedApplications[app.ApplicationID] = true
delete(m.acceptedApplications, app.ApplicationID)
}
return nil
}
func (m *exampleRMCallback) UpdateNode(response *si.NodeResponse) error {
m.Lock()
defer m.Unlock()
for _, node := range response.Accepted {
m.acceptedNodes[node.NodeID] = true
delete(m.rejectedNodes, node.NodeID)
}
for _, node := range response.Rejected {
m.rejectedNodes[node.NodeID] = true
delete(m.acceptedNodes, node.NodeID)
}
return nil
}
func newExampleRMCallback() *exampleRMCallback {
return &exampleRMCallback{
acceptedApplications: make(map[string]bool),
rejectedApplications: make(map[string]bool),
acceptedNodes: make(map[string]bool),
rejectedNodes: make(map[string]bool),
nodeAllocations: make(map[string][]*si.Allocation),
Allocations: make(map[string]*si.Allocation),
}
}
func exampleOfRunYourOwnRM() {
// Start all tests
serviceContext := entrypoint.StartAllServices()
proxy := serviceContext.RMProxy
// Setup queues
configData := `
partitions:
-
name: default
queues:
-
name: root
children:
- a
resources:
guaranteed:
memory: 200
vcore: 20
max:
memory: 200
vcore: 20
-
name: a
resources:
guaranteed:
memory: 100
vcore: 10
max:
memory: 150
vcore: 20
`
BuildInfoMap := make(map[string]string)
BuildInfoMap["k"] = "v"
// Register RM
mockRM := newExampleRMCallback()
_, err := proxy.RegisterResourceManager(
&si.RegisterResourceManagerRequest{
RmID: "rm:123",
PolicyGroup: "policygroup",
Version: "0.0.2",
BuildInfo: BuildInfoMap,
Config: configData,
}, mockRM)
if err != nil {
panic(err)
}
// Register a node
err = proxy.UpdateNode(&si.NodeRequest{
Nodes: []*si.NodeInfo{
{
NodeID: "node-1:1234",
Attributes: map[string]string{},
SchedulableResource: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 100},
"vcore": {Value: 20},
},
},
},
{
NodeID: "node-2:1234",
Attributes: map[string]string{},
SchedulableResource: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 100},
"vcore": {Value: 20},
},
},
},
},
// Please note that RM id is very important, should not be empty, and remember always set it
// for ALL update request
RmID: "rm:123",
})
if err != nil {
panic(err)
}
// (IMPORTANT)
// Different from kubernetes, we need app for allocation ask. You can put all pod requests under the same app.
// app name can be anything non-empty. Partition name can be empty (and internally becomes "default").
err = proxy.UpdateApplication(&si.ApplicationRequest{
New: []*si.AddApplicationRequest{
{
ApplicationID: "app-1",
QueueName: "a",
PartitionName: "",
Ugi: &si.UserGroupInformation{
User: "testuser",
},
},
},
RmID: "rm:123",
})
if err != nil {
panic(err)
}
// Refer to mock/rm_callback.go:109
// You need to check app accepted by scheduler before proceed.
// Send request
err = proxy.UpdateAllocation(&si.AllocationRequest{
Asks: []*si.AllocationAsk{
{
AllocationKey: "alloc-1",
ResourceAsk: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10},
"vcore": {Value: 1},
},
},
ApplicationID: "app-1",
},
},
RmID: "rm:123",
})
}