blob: eca20f66328b972e62a4aa715033140e646ba8a4 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package webservice
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/http/httptest"
"reflect"
"strings"
"testing"
"time"
"github.com/julienschmidt/httprouter"
"github.com/prometheus/client_golang/prometheus/promhttp"
"gopkg.in/yaml.v3"
"gotest.tools/v3/assert"
"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/common/security"
"github.com/apache/yunikorn-core/pkg/events"
"github.com/apache/yunikorn-core/pkg/metrics/history"
"github.com/apache/yunikorn-core/pkg/scheduler"
"github.com/apache/yunikorn-core/pkg/scheduler/objects"
"github.com/apache/yunikorn-core/pkg/scheduler/policies"
"github.com/apache/yunikorn-core/pkg/scheduler/ugm"
"github.com/apache/yunikorn-core/pkg/webservice/dao"
siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
const unmarshalError = "Failed to unmarshal error response from response body"
const statusCodeError = "Incorrect Status code"
const jsonMessageError = "JSON error message is incorrect"
const partitionNameWithoutClusterID = "default"
const normalizedPartitionName = "[rm-123]default"
const startConf = `
partitions:
- name: default
nodesortpolicy:
type: fair
queues:
- name: root
properties:
first: "some value with spaces"
second: somethingElse
`
const updatedConf = `
partitions:
- name: default
nodesortpolicy:
type: binpacking
queues:
- name: root
properties:
first: "changedValue"
`
const baseConf = `
partitions:
- name: default
nodesortpolicy:
type: fair
queues:
- name: root
submitacl: "*"
`
const invalidConf = `
partitions:
- name: default
nodesortpolicy:
type: invalid
queues:
- name: root
`
const configDefault = `
partitions:
- name: default
queues:
- name: root
submitacl: "*"
queues:
- name: default
- name: noapps
`
const configMultiPartitions = `
partitions:
-
name: gpu
queues:
-
name: root
-
name: default
nodesortpolicy:
type: fair
queues:
-
name: root
queues:
-
name: default
submitacl: "*"
`
const configTwoLevelQueues = `
partitions:
-
name: gpu
queues:
-
name: root
-
name: default
nodesortpolicy:
type: binpacking
queues:
-
name: root
properties:
application.sort.policy: fifo
childtemplate:
maxapplications: 10
properties:
application.sort.policy: fifo
resources:
guaranteed:
memory: 400000
max:
memory: 600000
queues:
-
name: a
queues:
-
name: a1
properties:
application.sort.policy: fifo
resources:
guaranteed:
memory: 500000
vcore: 50000
max:
memory: 800000
vcore: 80000
resources:
guaranteed:
memory: 500000
vcore: 50000
max:
memory: 800000
vcore: 80000
`
const userGroupLimitsConfig = `
partitions:
- name: default
queues:
- name: root
parent: true
submitacl: '*'
queues:
- name: parent1
parent: true
limits:
- limit: ""
users:
- test_user
maxapplications: 0
maxresources:
cpu: "200"
`
const userGroupLimitsInvalidConfig = `
partitions:
- name: default
queues:
- name: root
parent: true
submitacl: '*'
queues:
- name: parent1
parent: true
limits:
- limit: ""
users:
- test_user
maxapplications: 1
maxresources:
cpu: "0"
`
const userGroupLimitsInvalidConfig1 = `
partitions:
- name: default
queues:
- name: root
parent: true
submitacl: '*'
queues:
- name: parent1
parent: true
limits:
- limit: ""
users:
- test_user
`
const groupsLimitsConfig = `
partitions:
- name: default
queues:
- name: root
parent: true
submitacl: '*'
queues:
- name: default
limits:
- limit: ""
groups:
- testgroup
maxresources:
cpu: "200"
`
const rmID = "rm-123"
const policyGroup = "default-policy-group"
const queueName = "root.default"
const nodeID = "node-1"
var (
updatedExtraConf = map[string]string{
"log.level": "info",
"service.schedulingInterval": "1s",
"admissionController.accessControl.bypassAuth": "false",
}
)
// setup To take care of setting up config, cluster, partitions etc
func setup(t *testing.T, config string, partitionCount int) *scheduler.PartitionContext {
var err error
schedulerContext, err = scheduler.NewClusterContext(rmID, policyGroup, []byte(config))
assert.NilError(t, err, "Error when load clusterInfo from config")
assert.Equal(t, partitionCount, len(schedulerContext.GetPartitionMapClone()))
// Check default partition
partitionName := common.GetNormalizedPartitionName("default", rmID)
part := schedulerContext.GetPartition(partitionName)
assert.Equal(t, 0, len(part.GetApplications()))
return part
}
// simple wrapper to make creating an app easier
func newApplication(appID, partitionName, queueName, rmID string, ugi security.UserGroup) *objects.Application {
userGroup := ugi
if ugi.User == "" {
userGroup = security.UserGroup{User: "testuser", Groups: []string{"testgroup"}}
}
siApp := &si.AddApplicationRequest{
ApplicationID: appID,
QueueName: queueName,
PartitionName: partitionName,
}
return objects.NewApplication(siApp, userGroup, nil, rmID)
}
func TestValidateConf(t *testing.T) {
confTests := []struct {
content string
expectedResponse dao.ValidateConfResponse
}{
{
content: baseConf,
expectedResponse: dao.ValidateConfResponse{
Allowed: true,
Reason: "",
},
},
{
content: invalidConf,
expectedResponse: dao.ValidateConfResponse{
Allowed: false,
Reason: "undefined policy: invalid",
},
},
}
for _, test := range confTests {
// No err check: new request always returns correctly
//nolint: errcheck
req, _ := http.NewRequest("POST", "", strings.NewReader(test.content))
resp := &MockResponseWriter{}
validateConf(resp, req)
var vcr dao.ValidateConfResponse
err := json.Unmarshal(resp.outputBytes, &vcr)
assert.NilError(t, err, unmarshalError)
assert.Equal(t, vcr.Allowed, test.expectedResponse.Allowed, "allowed flag incorrect")
assert.Equal(t, vcr.Reason, test.expectedResponse.Reason, "response text not as expected")
}
}
func TestUserGroupLimits(t *testing.T) {
confTests := []struct {
content string
expectedResponse dao.ValidateConfResponse
}{
{
content: userGroupLimitsConfig,
expectedResponse: dao.ValidateConfResponse{
Allowed: true,
Reason: common.Empty,
},
},
{
content: userGroupLimitsInvalidConfig,
expectedResponse: dao.ValidateConfResponse{
Allowed: false,
Reason: "MaxResources should be greater than zero in '' limit",
},
},
{
content: userGroupLimitsInvalidConfig1,
expectedResponse: dao.ValidateConfResponse{
Allowed: false,
Reason: "invalid resource combination for limit all resource limits are null",
},
},
}
for _, test := range confTests {
// No err check: new request always returns correctly
//nolint: errcheck
req, _ := http.NewRequest("POST", "", strings.NewReader(test.content))
resp := &MockResponseWriter{}
validateConf(resp, req)
var vcr dao.ValidateConfResponse
err := json.Unmarshal(resp.outputBytes, &vcr)
assert.NilError(t, err, unmarshalError)
assert.Equal(t, vcr.Allowed, test.expectedResponse.Allowed, "allowed flag incorrect")
assert.Equal(t, vcr.Reason, test.expectedResponse.Reason, "response text not as expected")
}
}
func TestApplicationHistory(t *testing.T) {
// make sure the history is nil when we finish this test
defer ResetIMHistory()
// No err check: new request always returns correctly
//nolint: errcheck
req, _ := http.NewRequest("GET", "", strings.NewReader(""))
resp := &MockResponseWriter{}
// no init should return nothing
getApplicationHistory(resp, req)
var errInfo dao.YAPIError
err := json.Unmarshal(resp.outputBytes, &errInfo)
assert.NilError(t, err, unmarshalError)
assert.Equal(t, http.StatusInternalServerError, resp.statusCode, "app history handler returned wrong status")
assert.Equal(t, errInfo.Message, "Internal metrics collection is not enabled.", jsonMessageError)
assert.Equal(t, errInfo.StatusCode, http.StatusInternalServerError)
// init should return null and thus no records
imHistory = history.NewInternalMetricsHistory(5)
resp = &MockResponseWriter{}
getApplicationHistory(resp, req)
var appHist []dao.ApplicationHistoryDAOInfo
err = json.Unmarshal(resp.outputBytes, &appHist)
assert.NilError(t, err, unmarshalError)
assert.Equal(t, resp.statusCode, 0, "app response should have no status")
assert.Assert(t, appHist != nil, "appHist should not be nil")
assert.Equal(t, len(appHist), 0, "empty response must have no records")
// add new history records
imHistory.Store(1, 0)
imHistory.Store(2, 0)
imHistory.Store(30, 0)
resp = &MockResponseWriter{}
getApplicationHistory(resp, req)
err = json.Unmarshal(resp.outputBytes, &appHist)
assert.NilError(t, err, unmarshalError)
assert.Equal(t, resp.statusCode, 0, "app response should have no status")
assert.Equal(t, len(appHist), 3, "incorrect number of records returned")
assert.Equal(t, appHist[0].TotalApplications, "1", "metric 1 should be 1 apps and was not")
assert.Equal(t, appHist[2].TotalApplications, "30", "metric 3 should be 30 apps and was not")
// add new history records roll over the limit
// this gives us a list of (oldest to newest): 2, 30, 40, 50, 300
imHistory.Store(40, 0)
imHistory.Store(50, 0)
imHistory.Store(300, 0)
resp = &MockResponseWriter{}
getApplicationHistory(resp, req)
err = json.Unmarshal(resp.outputBytes, &appHist)
assert.NilError(t, err, unmarshalError)
assert.Equal(t, resp.statusCode, 0, "app response should have no status")
assert.Equal(t, len(appHist), 5, "incorrect number of records returned")
assert.Equal(t, appHist[0].TotalApplications, "2", "metric 1 should be 1 apps and was not")
assert.Equal(t, appHist[4].TotalApplications, "300", "metric 5 should be 300 apps and was not")
}
func TestContainerHistory(t *testing.T) {
// make sure the history is nil when we finish this test
defer ResetIMHistory()
// No err check: new request always returns correctly
//nolint: errcheck
req, _ := http.NewRequest("GET", "", strings.NewReader(""))
resp := &MockResponseWriter{}
// no init should return nothing
getContainerHistory(resp, req)
var errInfo dao.YAPIError
err := json.Unmarshal(resp.outputBytes, &errInfo)
assert.NilError(t, err, unmarshalError)
assert.Equal(t, http.StatusInternalServerError, resp.statusCode, "container history handler returned wrong status")
assert.Equal(t, errInfo.Message, "Internal metrics collection is not enabled.", jsonMessageError)
assert.Equal(t, errInfo.StatusCode, http.StatusInternalServerError)
// init should return null and thus no records
imHistory = history.NewInternalMetricsHistory(5)
resp = &MockResponseWriter{}
getContainerHistory(resp, req)
var contHist []dao.ContainerHistoryDAOInfo
err = json.Unmarshal(resp.outputBytes, &contHist)
assert.NilError(t, err, unmarshalError)
assert.Equal(t, resp.statusCode, 0, "container response should have no status")
assert.Check(t, contHist != nil, "contHist should not be nil")
assert.Equal(t, len(contHist), 0, "empty response must have no records")
// add new history records
imHistory.Store(0, 1)
imHistory.Store(0, 2)
imHistory.Store(0, 30)
resp = &MockResponseWriter{}
getContainerHistory(resp, req)
err = json.Unmarshal(resp.outputBytes, &contHist)
assert.NilError(t, err, unmarshalError)
assert.Equal(t, resp.statusCode, 0, "container response should have no status")
assert.Equal(t, len(contHist), 3, "incorrect number of records returned")
assert.Equal(t, contHist[0].TotalContainers, "1", "metric 1 should be 1 apps and was not")
assert.Equal(t, contHist[2].TotalContainers, "30", "metric 3 should be 30 apps and was not")
// add new history records roll over the limit
// this gives us a list of (oldest to newest): 2, 30, 40, 50, 300
imHistory.Store(0, 40)
imHistory.Store(0, 50)
imHistory.Store(0, 300)
resp = &MockResponseWriter{}
getContainerHistory(resp, req)
err = json.Unmarshal(resp.outputBytes, &contHist)
assert.NilError(t, err, unmarshalError)
assert.Equal(t, resp.statusCode, 0, "container response should have no status")
assert.Equal(t, len(contHist), 5, "incorrect number of records returned")
assert.Equal(t, contHist[0].TotalContainers, "2", "metric 1 should be 1 apps and was not")
assert.Equal(t, contHist[4].TotalContainers, "300", "metric 5 should be 300 apps and was not")
}
func TestGetConfigYAML(t *testing.T) {
var err error
schedulerContext, err = scheduler.NewClusterContext(rmID, policyGroup, []byte(startConf))
assert.NilError(t, err, "Error when load clusterInfo from config")
// No err check: new request always returns correctly
//nolint: errcheck
req, _ := http.NewRequest("GET", "", nil)
resp := &MockResponseWriter{}
getClusterConfig(resp, req)
// yaml unmarshal handles the checksum add the end automatically in this implementation
conf := &dao.ConfigDAOInfo{}
err = yaml.Unmarshal(resp.outputBytes, conf)
assert.NilError(t, err, unmarshalError)
assert.Equal(t, conf.Partitions[0].NodeSortPolicy.Type, "fair", "node sort policy set incorrectly, not fair")
startConfSum := conf.Checksum
assert.Assert(t, len(startConfSum) > 0, "checksum boundary not found")
// change the config
err = schedulerContext.UpdateRMSchedulerConfig(rmID, []byte(updatedConf))
assert.NilError(t, err, "Error when updating clusterInfo from config")
configs.SetConfigMap(updatedExtraConf)
// check that we return yaml by default, unmarshal will error when we don't
req.Header.Set("Accept", "unknown")
resp = &MockResponseWriter{}
getClusterConfig(resp, req)
err = yaml.Unmarshal(resp.outputBytes, conf)
assert.NilError(t, err, unmarshalError)
assert.Equal(t, conf.Partitions[0].NodeSortPolicy.Type, "binpacking", "node sort policy not updated")
assert.Assert(t, startConfSum != conf.Checksum, "checksums did not change in output")
assert.DeepEqual(t, conf.Extra, updatedExtraConf)
// reset extra config map
configs.SetConfigMap(map[string]string{})
}
func TestGetConfigJSON(t *testing.T) {
setup(t, startConf, 1)
// No err check: new request always returns correctly
//nolint: errcheck
req, _ := http.NewRequest("GET", "", nil)
req.Header.Set("Accept", "application/json")
resp := &MockResponseWriter{}
getClusterConfig(resp, req)
conf := &dao.ConfigDAOInfo{}
err := json.Unmarshal(resp.outputBytes, conf)
assert.NilError(t, err, unmarshalError)
startConfSum := conf.Checksum
assert.Equal(t, conf.Partitions[0].NodeSortPolicy.Type, "fair", "node sort policy set incorrectly, not fair (json)")
// change the config
err = schedulerContext.UpdateRMSchedulerConfig(rmID, []byte(updatedConf))
assert.NilError(t, err, "Error when updating clusterInfo from config")
configs.SetConfigMap(updatedExtraConf)
resp = &MockResponseWriter{}
getClusterConfig(resp, req)
err = json.Unmarshal(resp.outputBytes, conf)
assert.NilError(t, err, unmarshalError)
assert.Assert(t, startConfSum != conf.Checksum, "checksums did not change in json output: %s, %s", startConfSum, conf.Checksum)
assert.Equal(t, conf.Partitions[0].NodeSortPolicy.Type, "binpacking", "node sort policy not updated (json)")
assert.DeepEqual(t, conf.Extra, updatedExtraConf)
// reset extra config map
configs.SetConfigMap(map[string]string{})
}
func TestGetClusterUtilJSON(t *testing.T) {
setup(t, configDefault, 1)
// check build information of RM
buildInfoMap := make(map[string]string)
buildInfoMap["buildDate"] = "2006-01-02T15:04:05-0700"
buildInfoMap["buildVersion"] = "latest"
buildInfoMap["isPluginVersion"] = "false"
schedulerContext.SetRMInfo(rmID, buildInfoMap)
rmBuildInformationMaps := getRMBuildInformation(nil)
assert.Equal(t, 0, len(rmBuildInformationMaps))
rmInfo := schedulerContext.GetRMInfoMapClone()
assert.Equal(t, 1, len(rmInfo))
rmBuildInformationMaps = getRMBuildInformation(rmInfo)
assert.Equal(t, 1, len(rmBuildInformationMaps))
assert.Equal(t, rmBuildInformationMaps[0]["buildDate"], buildInfoMap["buildDate"])
assert.Equal(t, rmBuildInformationMaps[0]["buildVersion"], buildInfoMap["buildVersion"])
assert.Equal(t, rmBuildInformationMaps[0]["isPluginVersion"], buildInfoMap["isPluginVersion"])
assert.Equal(t, rmBuildInformationMaps[0]["rmId"], rmID)
// Check test partitions
partitionName := common.GetNormalizedPartitionName("default", rmID)
partition := schedulerContext.GetPartition(partitionName)
assert.Equal(t, partitionName, partition.Name)
// new app to partition
appID := "appID-1"
app := newApplication(appID, partitionName, queueName, rmID, security.UserGroup{})
err := partition.AddApplication(app)
assert.NilError(t, err, "add application to partition should not have failed")
// case of total resource and allocated resource undefined
utilZero := &dao.ClusterUtilDAOInfo{
ResourceType: "N/A",
Total: int64(-1),
Used: int64(-1),
Usage: "N/A",
}
result0 := getClusterUtilJSON(partition)
assert.Equal(t, ContainsObj(result0, utilZero), true)
// add node to partition with allocations
nodeRes := resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 1000, siCommon.CPU: 1000}).ToProto()
node1 := objects.NewNode(&si.NodeInfo{NodeID: nodeID, SchedulableResource: nodeRes})
resAlloc1 := resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 500, siCommon.CPU: 300})
resAlloc2 := resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 300, siCommon.CPU: 200})
ask1 := objects.NewAllocationAsk("alloc-1", appID, resAlloc1)
ask2 := objects.NewAllocationAsk("alloc-2", appID, resAlloc2)
alloc1 := objects.NewAllocation(nodeID, ask1)
alloc2 := objects.NewAllocation(nodeID, ask2)
allocs := []*objects.Allocation{alloc1, alloc2}
err = partition.AddNode(node1, allocs)
assert.NilError(t, err, "add node to partition should not have failed")
// set expected result
utilMem := &dao.ClusterUtilDAOInfo{
ResourceType: siCommon.Memory,
Total: int64(1000),
Used: int64(800),
Usage: "80%",
}
utilCore := &dao.ClusterUtilDAOInfo{
ResourceType: siCommon.CPU,
Total: int64(1000),
Used: int64(500),
Usage: "50%",
}
// check result fit answer or not
result := getClusterUtilJSON(partition)
assert.Equal(t, ContainsObj(result, utilMem), true)
assert.Equal(t, ContainsObj(result, utilCore), true)
}
func ContainsObj(slice interface{}, contains interface{}) bool {
value := reflect.ValueOf(slice)
for i := 0; i < value.Len(); i++ {
if value.Index(i).Interface() == contains {
return true
}
if reflect.DeepEqual(value.Index(i).Interface(), contains) {
return true
}
if fmt.Sprintf("%#v", value.Index(i).Interface()) == fmt.Sprintf("%#v", contains) {
return true
}
}
return false
}
func TestGetNodesUtilJSON(t *testing.T) {
partition := setup(t, configDefault, 1)
// create test application
appID := "app1"
app := newApplication(appID, partition.Name, queueName, rmID, security.UserGroup{})
err := partition.AddApplication(app)
assert.NilError(t, err, "add application to partition should not have failed")
// create test nodes
nodeRes := resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 1000, siCommon.CPU: 1000}).ToProto()
node1ID := "node-1"
node1 := objects.NewNode(&si.NodeInfo{NodeID: node1ID, SchedulableResource: nodeRes})
node2ID := "node-2"
nodeRes2 := resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 1000, siCommon.CPU: 1000, "GPU": 10}).ToProto()
node2 := objects.NewNode(&si.NodeInfo{NodeID: node2ID, SchedulableResource: nodeRes2})
node3ID := "node-3"
nodeCPU := resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.CPU: 1000}).ToProto()
node3 := objects.NewNode(&si.NodeInfo{NodeID: node3ID, SchedulableResource: nodeCPU})
// create test allocations
resAlloc1 := resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 500, siCommon.CPU: 300})
resAlloc2 := resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 300, siCommon.CPU: 500, "GPU": 5})
ask1 := objects.NewAllocationAsk("alloc-1", appID, resAlloc1)
ask2 := objects.NewAllocationAsk("alloc-2", appID, resAlloc2)
allocs := []*objects.Allocation{objects.NewAllocation(node1ID, ask1)}
err = partition.AddNode(node1, allocs)
assert.NilError(t, err, "add node to partition should not have failed")
allocs = []*objects.Allocation{objects.NewAllocation(node2ID, ask2)}
err = partition.AddNode(node2, allocs)
assert.NilError(t, err, "add node to partition should not have failed")
err = partition.AddNode(node3, nil)
assert.NilError(t, err, "add node to partition should not have failed")
// two nodes advertise memory: must show up in the list
result := getNodesUtilJSON(partition, siCommon.Memory)
subResult := result.NodesUtil
assert.Equal(t, result.ResourceType, siCommon.Memory)
assert.Equal(t, subResult[2].NumOfNodes, int64(1))
assert.Equal(t, subResult[4].NumOfNodes, int64(1))
assert.Equal(t, subResult[2].NodeNames[0], node2ID)
assert.Equal(t, subResult[4].NodeNames[0], node1ID)
// three nodes advertise cpu: must show up in the list
result = getNodesUtilJSON(partition, siCommon.CPU)
subResult = result.NodesUtil
assert.Equal(t, result.ResourceType, siCommon.CPU)
assert.Equal(t, subResult[0].NumOfNodes, int64(1))
assert.Equal(t, subResult[0].NodeNames[0], node3ID)
assert.Equal(t, subResult[2].NumOfNodes, int64(1))
assert.Equal(t, subResult[2].NodeNames[0], node1ID)
assert.Equal(t, subResult[4].NumOfNodes, int64(1))
assert.Equal(t, subResult[4].NodeNames[0], node2ID)
// one node advertise GPU: must show up in the list
result = getNodesUtilJSON(partition, "GPU")
subResult = result.NodesUtil
assert.Equal(t, result.ResourceType, "GPU")
assert.Equal(t, subResult[4].NumOfNodes, int64(1))
assert.Equal(t, subResult[4].NodeNames[0], node2ID)
result = getNodesUtilJSON(partition, "non-exist")
subResult = result.NodesUtil
assert.Equal(t, result.ResourceType, "non-exist")
assert.Equal(t, subResult[0].NumOfNodes, int64(0))
assert.Equal(t, len(subResult[0].NodeNames), 0)
}
func TestGetNodeUtilisation(t *testing.T) {
NewWebApp(&scheduler.ClusterContext{}, nil)
// var req *http.Request
req, err := http.NewRequest("GET", "/ws/v1/scheduler/node-utilization", strings.NewReader(""))
assert.NilError(t, err, "Get node utilisation Handler request failed")
req = req.WithContext(context.TODO())
resp := &MockResponseWriter{}
getNodeUtilisation(resp, req)
var errInfo dao.YAPIError
err = json.Unmarshal(resp.outputBytes, &errInfo)
assert.NilError(t, err, "getNodeUtilisation should have returned and error")
partition := setup(t, configDefault, 1)
utilisation := &dao.NodesUtilDAOInfo{}
err = json.Unmarshal(resp.outputBytes, utilisation)
assert.NilError(t, err, "getNodeUtilisation should have returned an empty object")
assert.Equal(t, utilisation.ResourceType, "", "unexpected type returned")
assert.Equal(t, len(utilisation.NodesUtil), 0, "no nodes should be returned")
assert.Assert(t, confirmNodeCount(utilisation.NodesUtil, 0), "unexpected number of nodes returned should be 0")
// create test nodes
node1ID := "node-1"
node2ID := "node-2"
node1 := addNode(t, partition, node1ID, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))
node2 := addNode(t, partition, node2ID, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, "second": 5}))
// get nodes utilization
resp = &MockResponseWriter{}
getNodeUtilisation(resp, req)
utilisation = &dao.NodesUtilDAOInfo{}
err = json.Unmarshal(resp.outputBytes, utilisation)
assert.NilError(t, err, "getNodeUtilisation should have returned an object")
assert.Equal(t, utilisation.ResourceType, "", "unexpected type returned")
assert.Equal(t, len(utilisation.NodesUtil), 10, "empty usage: unexpected bucket count returned")
assert.Assert(t, confirmNodeCount(utilisation.NodesUtil, 0), "unexpected number of nodes returned should be 0")
resAlloc := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
ask := objects.NewAllocationAsk("alloc-1", "app", resAlloc)
alloc := objects.NewAllocation(node1ID, ask)
assert.Assert(t, node1.AddAllocation(alloc), "unexpected failure adding allocation to node")
rootQ := partition.GetQueue("root")
err = rootQ.IncAllocatedResource(resAlloc, false)
assert.NilError(t, err, "unexpected error returned setting allocated resource on queue")
// get nodes utilization
resp = &MockResponseWriter{}
getNodeUtilisation(resp, req)
utilisation = &dao.NodesUtilDAOInfo{}
err = json.Unmarshal(resp.outputBytes, utilisation)
assert.NilError(t, err, "getNodeUtilisation should have returned an object")
assert.Equal(t, utilisation.ResourceType, "first", "expected first as type returned")
assert.Equal(t, len(utilisation.NodesUtil), 10, "empty usage: unexpected bucket count returned")
assert.Assert(t, confirmNodeCount(utilisation.NodesUtil, 2), "unexpected number of nodes returned should be 2")
// make second type dominant by using all
resAlloc = resources.NewResourceFromMap(map[string]resources.Quantity{"second": 5})
ask = objects.NewAllocationAsk("alloc-2", "app", resAlloc)
alloc = objects.NewAllocation(node2ID, ask)
assert.Assert(t, node2.AddAllocation(alloc), "unexpected failure adding allocation to node")
err = rootQ.IncAllocatedResource(resAlloc, false)
assert.NilError(t, err, "unexpected error returned setting allocated resource on queue")
// get nodes utilization
resp = &MockResponseWriter{}
getNodeUtilisation(resp, req)
utilisation = &dao.NodesUtilDAOInfo{}
err = json.Unmarshal(resp.outputBytes, utilisation)
assert.NilError(t, err, "getNodeUtilisation should have returned an object")
assert.Equal(t, utilisation.ResourceType, "second", "expected second as type returned")
assert.Equal(t, len(utilisation.NodesUtil), 10, "empty usage: unexpected bucket count returned")
assert.Assert(t, confirmNodeCount(utilisation.NodesUtil, 1), "unexpected number of nodes returned should be 1")
}
func addNode(t *testing.T, partition *scheduler.PartitionContext, nodeId string, resource *resources.Resource) *objects.Node {
nodeRes := resource.ToProto()
node := objects.NewNode(&si.NodeInfo{NodeID: nodeId, SchedulableResource: nodeRes})
err := partition.AddNode(node, nil)
assert.NilError(t, err, "adding node to partition should not fail")
return node
}
func addAllocatedResource(t *testing.T, node *objects.Node, allocationKey string, appID string, quantityMap map[string]resources.Quantity) {
t.Helper()
resAlloc := resources.NewResourceFromMap(quantityMap)
ask := objects.NewAllocationAsk(allocationKey, appID, resAlloc)
alloc := objects.NewAllocation(node.NodeID, ask)
assert.Assert(t, node.AddAllocation(alloc), "unexpected failure adding allocation to node")
}
func confirmNodeCount(info []*dao.NodeUtilDAOInfo, count int64) bool {
var total int64
for _, node := range info {
total += node.NumOfNodes
}
return total == count
}
func addAndConfirmApplicationExists(t *testing.T, partitionName string, partition *scheduler.PartitionContext, appName string) *objects.Application {
// add a new app
app := newApplication(appName, partitionName, "root.default", rmID, security.UserGroup{User: "testuser", Groups: []string{"testgroup"}})
err := partition.AddApplication(app)
assert.NilError(t, err, "Failed to add Application to Partition.")
assert.Equal(t, app.CurrentState(), objects.New.String())
return app
}
func TestGetPartitionNodesUtilJSON(t *testing.T) {
// setup
partition := setup(t, configDefault, 1)
appID := "app1"
node1ID := "node-1"
node2ID := "node-2"
node3ID := "node-3"
// create test nodes
node1 := addNode(t, partition, node1ID, resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 1000, siCommon.CPU: 1000}))
node2 := addNode(t, partition, node2ID, resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 1000, siCommon.CPU: 1000, "GPU": 10}))
addNode(t, partition, node3ID, resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.CPU: 1000}))
// create test allocations
addAllocatedResource(t, node1, "alloc-1", appID, map[string]resources.Quantity{siCommon.Memory: 500, siCommon.CPU: 300})
addAllocatedResource(t, node2, "alloc-2", appID, map[string]resources.Quantity{siCommon.Memory: 300, siCommon.CPU: 500, "GPU": 5})
// assert partition nodes utilization
result := getPartitionNodesUtilJSON(partition)
assert.Equal(t, result.ClusterID, rmID)
assert.Equal(t, result.Partition, "default")
assert.Equal(t, len(result.NodesUtilList), 3, "Should have 3 resource types(CPU/Memory/GPU) in the list.")
// two nodes advertise memory: must show up in the list
memoryNodesUtil := getNodesUtilByType(t, result.NodesUtilList, siCommon.Memory)
assert.Equal(t, memoryNodesUtil.NodesUtil[2].NumOfNodes, int64(1))
assert.Equal(t, memoryNodesUtil.NodesUtil[4].NumOfNodes, int64(1))
assert.Equal(t, memoryNodesUtil.NodesUtil[2].NodeNames[0], node2ID)
assert.Equal(t, memoryNodesUtil.NodesUtil[4].NodeNames[0], node1ID)
// three nodes advertise cpu: must show up in the list
cpuNodesUtil := getNodesUtilByType(t, result.NodesUtilList, siCommon.CPU)
assert.Equal(t, cpuNodesUtil.NodesUtil[0].NumOfNodes, int64(1))
assert.Equal(t, cpuNodesUtil.NodesUtil[0].NodeNames[0], node3ID)
assert.Equal(t, cpuNodesUtil.NodesUtil[2].NumOfNodes, int64(1))
assert.Equal(t, cpuNodesUtil.NodesUtil[2].NodeNames[0], node1ID)
assert.Equal(t, cpuNodesUtil.NodesUtil[4].NumOfNodes, int64(1))
assert.Equal(t, cpuNodesUtil.NodesUtil[4].NodeNames[0], node2ID)
// one node advertise GPU: must show up in the list
gpuNodesUtil := getNodesUtilByType(t, result.NodesUtilList, "GPU")
assert.Equal(t, gpuNodesUtil.NodesUtil[4].NumOfNodes, int64(1))
assert.Equal(t, gpuNodesUtil.NodesUtil[4].NodeNames[0], node2ID)
}
func TestGetNodeUtilisations(t *testing.T) {
// setup
NewWebApp(&scheduler.ClusterContext{}, nil)
req, err := http.NewRequest("GET", "/ws/v1/scheduler/node-utilizations", strings.NewReader(""))
assert.NilError(t, err, "Get node utilisations Handler request failed")
resp := &MockResponseWriter{}
getNodeUtilisations(resp, req)
var partitionNodesUtilDAOInfo []*dao.PartitionNodesUtilDAOInfo
err = json.Unmarshal(resp.outputBytes, &partitionNodesUtilDAOInfo)
assert.NilError(t, err, "should decode a empty list of *dao.PartitionNodesUtilDAOInfo")
assert.Equal(t, len(partitionNodesUtilDAOInfo), 0)
// setup partitions
schedulerContext, err = scheduler.NewClusterContext(rmID, policyGroup, []byte(configMultiPartitions))
assert.NilError(t, err, "Error when load clusterInfo from config")
schedulerContext.GetPartition("default")
defaultPartition := schedulerContext.GetPartition(common.GetNormalizedPartitionName("default", rmID))
gpuPartition := schedulerContext.GetPartition(common.GetNormalizedPartitionName("gpu", rmID))
// add nodes to partitions
node1 := addNode(t, defaultPartition, "node-1", resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 10}))
node2 := addNode(t, defaultPartition, "node-2", resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 10, "vcore": 5}))
node3 := addNode(t, defaultPartition, "node-3", resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 20, "vcore": 15}))
node4 := addNode(t, gpuPartition, "node-4", resources.NewResourceFromMap(map[string]resources.Quantity{"gpu": 10}))
// add allocatedResource to nodes
addAllocatedResource(t, node1, "alloc-1", "app-1", map[string]resources.Quantity{"memory": 1})
addAllocatedResource(t, node2, "alloc-1", "app-1", map[string]resources.Quantity{"memory": 1, "vcore": 1})
addAllocatedResource(t, node3, "alloc-1", "app-1", map[string]resources.Quantity{"memory": 1, "vcore": 1})
addAllocatedResource(t, node4, "alloc-1", "app-1", map[string]resources.Quantity{"gpu": 1})
// get nodes utilizations
resp = &MockResponseWriter{}
getNodeUtilisations(resp, req)
err = json.Unmarshal(resp.outputBytes, &partitionNodesUtilDAOInfo)
assert.NilError(t, err, "should decode a list of *dao.PartitionNodesUtilDAOInfo")
assert.Equal(t, len(partitionNodesUtilDAOInfo), 2)
assert.Equal(t, partitionNodesUtilDAOInfo[0].ClusterID, rmID)
assert.Equal(t, partitionNodesUtilDAOInfo[1].ClusterID, rmID)
defaultPartitionNodesUtilDAOInfo := partitionNodesUtilDAOInfo[0]
gpuPartitionNodesUtilDAOInfo := partitionNodesUtilDAOInfo[1]
if defaultPartitionNodesUtilDAOInfo.Partition == "gpu" {
defaultPartitionNodesUtilDAOInfo = partitionNodesUtilDAOInfo[1]
gpuPartitionNodesUtilDAOInfo = partitionNodesUtilDAOInfo[0]
}
assert.Equal(t, len(defaultPartitionNodesUtilDAOInfo.NodesUtilList), 2)
assert.Equal(t, len(gpuPartitionNodesUtilDAOInfo.NodesUtilList), 1)
assertNodeUtilisationContent(t, defaultPartitionNodesUtilDAOInfo, "memory", 3)
assertNodeUtilisationContent(t, defaultPartitionNodesUtilDAOInfo, "vcore", 2)
assertNodeUtilisationContent(t, gpuPartitionNodesUtilDAOInfo, "gpu", 1)
}
func assertNodeUtilisationContent(t *testing.T, partitionNodesUtilDAOInfo *dao.PartitionNodesUtilDAOInfo, resourceType string, expectedNodeCount int) {
t.Helper()
nodeUtilisation := getNodesUtilByType(t, partitionNodesUtilDAOInfo.NodesUtilList, resourceType)
assert.Equal(t, nodeUtilisation.ResourceType, resourceType, fmt.Sprintf("should have returned '%s', but got '%s'", resourceType, nodeUtilisation.ResourceType))
assert.Equal(t, len(nodeUtilisation.NodesUtil), 10, fmt.Sprintf("should have 10 bucket, but got %d", len(nodeUtilisation.NodesUtil)))
assert.Assert(t,
confirmNodeCount(nodeUtilisation.NodesUtil, int64(expectedNodeCount)),
fmt.Sprintf("unexpected number of nodes returned, should be %d", expectedNodeCount),
)
}
func getNodesUtilByType(t *testing.T, nodesUtilList []*dao.NodesUtilDAOInfo, resourceType string) *dao.NodesUtilDAOInfo {
t.Helper()
for _, nodesUtil := range nodesUtilList {
if nodesUtil.ResourceType == resourceType {
return nodesUtil
}
}
t.Fatalf("should have returned a *dao.NodesUtilDAOInfo with resourceType %s", resourceType)
return nil
}
func TestPartitions(t *testing.T) {
schedulerContext = &scheduler.ClusterContext{}
var req *http.Request
req, err := http.NewRequest("GET", "/ws/v1/partitions", strings.NewReader(""))
assert.NilError(t, err, "App Handler request failed")
resp := &MockResponseWriter{}
var partitionInfo []*dao.PartitionInfo
getPartitions(resp, req)
err = json.Unmarshal(resp.outputBytes, &partitionInfo)
assert.NilError(t, err, unmarshalError)
assert.Check(t, partitionInfo != nil, "partitionInfo should not be nil")
assert.Equal(t, len(partitionInfo), 0)
defaultPartition := setup(t, configMultiPartitions, 2)
partitionName := defaultPartition.Name
// add a new app
addAndConfirmApplicationExists(t, partitionName, defaultPartition, "app-0")
// add a new app1 - accepted
app1 := addAndConfirmApplicationExists(t, partitionName, defaultPartition, "app-1")
app1.SetState(objects.Accepted.String())
// add a new app2 - running
app2 := addAndConfirmApplicationExists(t, partitionName, defaultPartition, "app-2")
app2.SetState(objects.Running.String())
// add a new app3 - completing
app3 := addAndConfirmApplicationExists(t, partitionName, defaultPartition, "app-3")
app3.SetState(objects.Completing.String())
// add a new app4 - rejected
app4 := addAndConfirmApplicationExists(t, partitionName, defaultPartition, "app-4")
app4.SetState(objects.Rejected.String())
// add a new app5 - completed
app5 := addAndConfirmApplicationExists(t, partitionName, defaultPartition, "app-5")
app5.SetState(objects.Completed.String())
// add a new app7 - failed
app6 := addAndConfirmApplicationExists(t, partitionName, defaultPartition, "app-6")
app6.SetState(objects.Failed.String())
NewWebApp(schedulerContext, nil)
// create test nodes
nodeRes := resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 500, siCommon.CPU: 500}).ToProto()
node1ID := "node-1"
node1 := objects.NewNode(&si.NodeInfo{NodeID: node1ID, SchedulableResource: nodeRes})
node2ID := "node-2"
node2 := objects.NewNode(&si.NodeInfo{NodeID: node2ID, SchedulableResource: nodeRes})
// create test allocations
resAlloc1 := resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 100, siCommon.CPU: 400})
resAlloc2 := resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 200, siCommon.CPU: 300})
ask1 := objects.NewAllocationAsk("alloc-1", app5.ApplicationID, resAlloc1)
ask2 := objects.NewAllocationAsk("alloc-2", app2.ApplicationID, resAlloc2)
allocs := []*objects.Allocation{objects.NewAllocation(node1ID, ask1)}
err = defaultPartition.AddNode(node1, allocs)
assert.NilError(t, err, "add node to partition should not have failed")
allocs = []*objects.Allocation{objects.NewAllocation(node2ID, ask2)}
err = defaultPartition.AddNode(node2, allocs)
assert.NilError(t, err, "add node to partition should not have failed")
req, err = http.NewRequest("GET", "/ws/v1/partitions", strings.NewReader(""))
assert.NilError(t, err, "App Handler request failed")
resp = &MockResponseWriter{}
getPartitions(resp, req)
err = json.Unmarshal(resp.outputBytes, &partitionInfo)
assert.NilError(t, err, unmarshalError)
cs := make(map[string]*dao.PartitionInfo, 2)
for _, d := range partitionInfo {
cs[d.Name] = d
}
assert.Assert(t, cs["default"] != nil)
assert.Equal(t, cs["default"].ClusterID, "rm-123")
assert.Equal(t, cs["default"].Name, "default")
assert.Equal(t, cs["default"].NodeSortingPolicy.Type, "fair")
assert.Equal(t, cs["default"].NodeSortingPolicy.ResourceWeights["vcore"], 1.0)
assert.Equal(t, cs["default"].NodeSortingPolicy.ResourceWeights["memory"], 1.0)
assert.Equal(t, cs["default"].Applications["total"], 7)
assert.Equal(t, cs["default"].Applications[objects.New.String()], 1)
assert.Equal(t, cs["default"].Applications[objects.Accepted.String()], 1)
assert.Equal(t, cs["default"].Applications[objects.Running.String()], 1)
assert.Equal(t, cs["default"].Applications[objects.Completing.String()], 1)
assert.Equal(t, cs["default"].Applications[objects.Rejected.String()], 1)
assert.Equal(t, cs["default"].Applications[objects.Completed.String()], 1)
assert.Equal(t, cs["default"].Applications[objects.Failed.String()], 1)
assert.DeepEqual(t, cs["default"].Capacity.Capacity, map[string]int64{"memory": 1000, "vcore": 1000})
assert.DeepEqual(t, cs["default"].Capacity.UsedCapacity, map[string]int64{"memory": 300, "vcore": 700})
assert.DeepEqual(t, cs["default"].Capacity.Utilization, map[string]int64{"memory": 30, "vcore": 70})
assert.Equal(t, cs["default"].State, "Active")
assert.Assert(t, cs["gpu"] != nil)
assert.Equal(t, cs["gpu"].ClusterID, "rm-123")
assert.Equal(t, cs["gpu"].Name, "gpu")
assert.Equal(t, cs["default"].NodeSortingPolicy.Type, "fair")
assert.Equal(t, cs["default"].NodeSortingPolicy.ResourceWeights["vcore"], 1.0)
assert.Equal(t, cs["default"].NodeSortingPolicy.ResourceWeights["memory"], 1.0)
assert.Equal(t, cs["gpu"].Applications["total"], 0)
}
func TestMetricsNotEmpty(t *testing.T) {
req, err := http.NewRequest("GET", "/ws/v1/metrics", strings.NewReader(""))
assert.NilError(t, err, "Error while creating the request")
rr := httptest.NewRecorder()
handler := http.HandlerFunc(promhttp.Handler().ServeHTTP)
handler.ServeHTTP(rr, req)
assert.Assert(t, len(rr.Body.Bytes()) > 0, "Metrics response should not be empty")
}
func TestGetPartitionQueuesHandler(t *testing.T) {
setup(t, configTwoLevelQueues, 2)
NewWebApp(schedulerContext, nil)
tMaxResource, err := resources.NewResourceFromConf(map[string]string{"memory": "600000"})
assert.NilError(t, err)
tGuaranteedResource, err := resources.NewResourceFromConf(map[string]string{"memory": "400000"})
assert.NilError(t, err)
templateInfo := dao.TemplateInfo{
MaxApplications: 10,
MaxResource: tMaxResource.DAOMap(),
GuaranteedResource: tGuaranteedResource.DAOMap(),
Properties: map[string]string{
configs.ApplicationSortPolicy: policies.FifoSortPolicy.String(),
},
}
maxResource, err := resources.NewResourceFromConf(map[string]string{"memory": "800000", "vcore": "80000"})
assert.NilError(t, err)
guaranteedResource, err := resources.NewResourceFromConf(map[string]string{"memory": "500000", "vcore": "50000"})
assert.NilError(t, err)
var req *http.Request
req, err = http.NewRequest("GET", "/ws/v1/partition/default/queues", strings.NewReader(""))
req = req.WithContext(context.WithValue(req.Context(), httprouter.ParamsKey, httprouter.Params{httprouter.Param{Key: "partition", Value: partitionNameWithoutClusterID}}))
assert.NilError(t, err, "Get Queues for PartitionQueues Handler request failed")
resp := &MockResponseWriter{}
var partitionQueuesDao dao.PartitionQueueDAOInfo
getPartitionQueues(resp, req)
err = json.Unmarshal(resp.outputBytes, &partitionQueuesDao)
assert.NilError(t, err, unmarshalError)
// assert root fields
assert.Equal(t, partitionQueuesDao.QueueName, configs.RootQueue)
assert.Equal(t, partitionQueuesDao.Status, objects.Active.String())
assert.Equal(t, partitionQueuesDao.Partition, configs.DefaultPartition)
assert.Assert(t, partitionQueuesDao.PendingResource == nil)
assert.Assert(t, partitionQueuesDao.MaxResource == nil)
assert.Assert(t, partitionQueuesDao.GuaranteedResource == nil)
assert.Assert(t, partitionQueuesDao.AllocatedResource == nil)
assert.Assert(t, partitionQueuesDao.PreemptingResource == nil)
assert.Assert(t, partitionQueuesDao.HeadRoom == nil)
assert.Assert(t, !partitionQueuesDao.IsLeaf)
assert.Assert(t, partitionQueuesDao.IsManaged)
assert.Equal(t, partitionQueuesDao.Parent, "")
assert.Assert(t, partitionQueuesDao.AbsUsedCapacity == nil)
assert.Equal(t, partitionQueuesDao.MaxRunningApps, uint64(0))
assert.Equal(t, partitionQueuesDao.RunningApps, uint64(0))
assert.Equal(t, partitionQueuesDao.CurrentPriority, configs.MinPriority)
assert.Assert(t, partitionQueuesDao.AllocatingAcceptedApps == nil)
assert.Equal(t, len(partitionQueuesDao.Properties), 1)
assert.Equal(t, partitionQueuesDao.Properties[configs.ApplicationSortPolicy], policies.FifoSortPolicy.String())
assert.DeepEqual(t, partitionQueuesDao.TemplateInfo, &templateInfo)
// assert child root.a fields
assert.Equal(t, len(partitionQueuesDao.Children), 1)
child := &partitionQueuesDao.Children[0]
assert.Equal(t, child.QueueName, "root.a")
assert.Equal(t, child.Status, objects.Active.String())
assert.Equal(t, child.Partition, "")
assert.Assert(t, child.PendingResource == nil)
assert.DeepEqual(t, child.MaxResource, maxResource.DAOMap())
assert.DeepEqual(t, child.GuaranteedResource, guaranteedResource.DAOMap())
assert.Assert(t, child.AllocatedResource == nil)
assert.Assert(t, child.PreemptingResource == nil)
assert.DeepEqual(t, child.HeadRoom, maxResource.DAOMap())
assert.Assert(t, !child.IsLeaf)
assert.Assert(t, child.IsManaged)
assert.Equal(t, child.Parent, configs.RootQueue)
assert.Assert(t, child.AbsUsedCapacity == nil)
assert.Equal(t, child.MaxRunningApps, uint64(0))
assert.Equal(t, child.RunningApps, uint64(0))
assert.Equal(t, child.CurrentPriority, configs.MinPriority)
assert.Assert(t, child.AllocatingAcceptedApps == nil)
assert.Equal(t, len(child.Properties), 1)
assert.Equal(t, child.Properties[configs.ApplicationSortPolicy], policies.FifoSortPolicy.String())
assert.DeepEqual(t, child.TemplateInfo, &templateInfo)
// assert child root.a.a1 fields
assert.Equal(t, len(partitionQueuesDao.Children[0].Children), 1)
child = &partitionQueuesDao.Children[0].Children[0]
assert.Equal(t, child.QueueName, "root.a.a1")
assert.Equal(t, child.Status, objects.Active.String())
assert.Equal(t, child.Partition, "")
assert.Assert(t, child.PendingResource == nil)
assert.DeepEqual(t, child.MaxResource, maxResource.DAOMap())
assert.DeepEqual(t, child.GuaranteedResource, guaranteedResource.DAOMap())
assert.Assert(t, child.AllocatedResource == nil)
assert.Assert(t, child.PreemptingResource == nil)
assert.DeepEqual(t, child.HeadRoom, maxResource.DAOMap())
assert.Assert(t, child.IsLeaf)
assert.Assert(t, child.IsManaged)
assert.Equal(t, child.Parent, "root.a")
assert.Assert(t, child.AbsUsedCapacity == nil)
assert.Equal(t, child.MaxRunningApps, uint64(0))
assert.Equal(t, child.RunningApps, uint64(0))
assert.Equal(t, child.CurrentPriority, configs.MinPriority)
assert.Assert(t, child.AllocatingAcceptedApps == nil)
assert.Equal(t, len(child.Properties), 1)
assert.Equal(t, child.Properties[configs.ApplicationSortPolicy], policies.FifoSortPolicy.String())
assert.Assert(t, child.TemplateInfo == nil)
// test partition not exists
req, err = http.NewRequest("GET", "/ws/v1/partition/default/queues", strings.NewReader(""))
req = req.WithContext(context.WithValue(req.Context(), httprouter.ParamsKey, httprouter.Params{httprouter.Param{Key: "partition", Value: "notexists"}}))
assert.NilError(t, err)
resp = &MockResponseWriter{}
getPartitionQueues(resp, req)
assertPartitionNotExists(t, resp)
// test params name missing
req, err = http.NewRequest("GET", "/ws/v1/partition/default/queues", strings.NewReader(""))
assert.NilError(t, err)
resp = &MockResponseWriter{}
getPartitionQueues(resp, req)
assertParamsMissing(t, resp)
// test specific queue
var partitionQueueDao1 dao.PartitionQueueDAOInfo
req, err = http.NewRequest("GET", "/ws/v1/partition/default/queue/root.a", strings.NewReader(""))
req = req.WithContext(context.WithValue(req.Context(), httprouter.ParamsKey, httprouter.Params{httprouter.Param{Key: "partition", Value: "default"}, httprouter.Param{Key: "queue", Value: "root.a"}}))
assert.NilError(t, err)
resp = &MockResponseWriter{}
getPartitionQueue(resp, req)
err = json.Unmarshal(resp.outputBytes, &partitionQueueDao1)
assert.NilError(t, err, unmarshalError)
assert.Equal(t, partitionQueueDao1.QueueName, "root.a")
assert.Equal(t, len(partitionQueueDao1.Children), 0)
assert.Equal(t, len(partitionQueueDao1.ChildNames), 1)
assert.Equal(t, partitionQueueDao1.ChildNames[0], "root.a.a1")
// test hierarchy queue
var partitionQueueDao2 dao.PartitionQueueDAOInfo
req, err = http.NewRequest("GET", "/ws/v1/partition/default/queue/root.a?subtree", strings.NewReader(""))
req = req.WithContext(context.WithValue(req.Context(), httprouter.ParamsKey, httprouter.Params{httprouter.Param{Key: "partition", Value: "default"}, httprouter.Param{Key: "queue", Value: "root.a"}}))
assert.NilError(t, err)
resp = &MockResponseWriter{}
getPartitionQueue(resp, req)
err = json.Unmarshal(resp.outputBytes, &partitionQueueDao2)
assert.NilError(t, err, unmarshalError)
assert.Equal(t, partitionQueueDao2.QueueName, "root.a")
assert.Equal(t, len(partitionQueueDao2.Children), 1)
assert.Equal(t, len(partitionQueueDao2.ChildNames), 1)
assert.Equal(t, partitionQueueDao2.Children[0].QueueName, "root.a.a1")
assert.Equal(t, partitionQueueDao2.ChildNames[0], "root.a.a1")
// test partition not exists
req, err = http.NewRequest("GET", "/ws/v1/partition/default/queue/root.a", strings.NewReader(""))
req = req.WithContext(context.WithValue(req.Context(), httprouter.ParamsKey, httprouter.Params{httprouter.Param{Key: "partition", Value: "notexists"}}))
assert.NilError(t, err)
resp = &MockResponseWriter{}
getPartitionQueue(resp, req)
assertPartitionNotExists(t, resp)
// test params name missing
req, err = http.NewRequest("GET", "/ws/v1/partition/default/queue/root.a", strings.NewReader(""))
assert.NilError(t, err)
resp = &MockResponseWriter{}
getPartitionQueue(resp, req)
assertParamsMissing(t, resp)
// test invalid queue name
req, err = http.NewRequest("GET", "/ws/v1/partition/default/queue/root.a", strings.NewReader(""))
req = req.WithContext(context.WithValue(req.Context(), httprouter.ParamsKey, httprouter.Params{httprouter.Param{Key: "partition", Value: "default"}, httprouter.Param{Key: "queue", Value: "root.notexists@"}}))
assert.NilError(t, err)
resp = &MockResponseWriter{}
getPartitionQueue(resp, req)
assertQueueInvalid(t, resp, "root.notexists@", "notexists@")
// test queue is not exists
req, err = http.NewRequest("GET", "/ws/v1/partition/default/queue/root.a", strings.NewReader(""))
req = req.WithContext(context.WithValue(req.Context(), httprouter.ParamsKey, httprouter.Params{httprouter.Param{Key: "partition", Value: "default"}, httprouter.Param{Key: "queue", Value: "notexists"}}))
assert.NilError(t, err)
resp = &MockResponseWriter{}
getPartitionQueue(resp, req)
assertQueueNotExists(t, resp)
}
func TestGetClusterInfo(t *testing.T) {
schedulerContext = &scheduler.ClusterContext{}
resp := &MockResponseWriter{}
getClusterInfo(resp, nil)
var data []*dao.ClusterDAOInfo
err := json.Unmarshal(resp.outputBytes, &data)
assert.NilError(t, err)
assert.Equal(t, 0, len(data))
setup(t, configTwoLevelQueues, 2)
resp = &MockResponseWriter{}
getClusterInfo(resp, nil)
err = json.Unmarshal(resp.outputBytes, &data)
assert.NilError(t, err)
assert.Equal(t, 2, len(data))
cs := make(map[string]*dao.ClusterDAOInfo, 2)
for _, d := range data {
cs[d.PartitionName] = d
}
assert.Assert(t, cs["default"] != nil)
assert.Assert(t, cs["gpu"] != nil)
}
func TestGetPartitionNodes(t *testing.T) {
partition := setup(t, configDefault, 1)
// create test application
appID := "app1"
app := newApplication(appID, partition.Name, queueName, rmID, security.UserGroup{User: "testuser", Groups: []string{"testgroup"}})
err := partition.AddApplication(app)
assert.NilError(t, err, "add application to partition should not have failed")
// create test nodes
attributesOfnode1 := map[string]string{"Disk": "SSD"}
attributesOfnode2 := map[string]string{"Devices": "camera"}
nodeRes := resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 1000, siCommon.CPU: 1000}).ToProto()
node1ID := "node-1"
node1 := objects.NewNode(&si.NodeInfo{NodeID: node1ID, Attributes: attributesOfnode1, SchedulableResource: nodeRes})
node2ID := "node-2"
node2 := objects.NewNode(&si.NodeInfo{NodeID: node2ID, Attributes: attributesOfnode2, SchedulableResource: nodeRes})
// create test allocations
resAlloc1 := resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 500, siCommon.CPU: 300})
resAlloc2 := resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 300, siCommon.CPU: 500})
ask1 := objects.NewAllocationAsk("alloc-1", appID, resAlloc1)
ask2 := objects.NewAllocationAsk("alloc-2", appID, resAlloc2)
allocs := []*objects.Allocation{objects.NewAllocation(node1ID, ask1)}
err = partition.AddNode(node1, allocs)
assert.NilError(t, err, "add node to partition should not have failed")
allocs = []*objects.Allocation{objects.NewAllocation(node2ID, ask2)}
err = partition.AddNode(node2, allocs)
assert.NilError(t, err, "add node to partition should not have failed")
NewWebApp(schedulerContext, nil)
var req *http.Request
req, err = http.NewRequest("GET", "/ws/v1/partition/default/nodes", strings.NewReader(""))
req = req.WithContext(context.WithValue(req.Context(), httprouter.ParamsKey, httprouter.Params{httprouter.Param{Key: "partition", Value: partitionNameWithoutClusterID}}))
assert.NilError(t, err, "Get Nodes for PartitionNodes Handler request failed")
resp := &MockResponseWriter{}
var partitionNodesDao []*dao.NodeDAOInfo
getPartitionNodes(resp, req)
err = json.Unmarshal(resp.outputBytes, &partitionNodesDao)
assert.NilError(t, err, unmarshalError)
assert.Equal(t, 1, len(partitionNodesDao[0].Allocations))
for _, node := range partitionNodesDao {
assert.Equal(t, 1, len(node.Allocations))
if !node.IsReserved {
assert.Equal(t, len(node.Reservations), 0)
} else {
assert.Check(t, len(node.Reservations) > 0, "Get wrong reservation info from node dao")
}
if node.NodeID == node1ID {
assert.Equal(t, node.NodeID, node1ID)
assert.Equal(t, "alloc-1", node.Allocations[0].AllocationKey)
assert.DeepEqual(t, attributesOfnode1, node.Attributes)
assert.DeepEqual(t, map[string]int64{"memory": 50, "vcore": 30}, node.Utilized)
} else {
assert.Equal(t, node.NodeID, node2ID)
assert.Equal(t, "alloc-2", node.Allocations[0].AllocationKey)
assert.DeepEqual(t, attributesOfnode2, node.Attributes)
assert.DeepEqual(t, map[string]int64{"memory": 30, "vcore": 50}, node.Utilized)
}
}
req, err = http.NewRequest("GET", "/ws/v1/partition/default/nodes", strings.NewReader(""))
req = req.WithContext(context.WithValue(req.Context(), httprouter.ParamsKey, httprouter.Params{httprouter.Param{Key: "partition", Value: "notexists"}}))
assert.NilError(t, err, "Get Nodes for PartitionNodes Handler request failed")
resp1 := &MockResponseWriter{}
getPartitionNodes(resp1, req)
assertPartitionNotExists(t, resp1)
// test params name missing
req, err = http.NewRequest("GET", "/ws/v1/partition/default/nodes", strings.NewReader(""))
assert.NilError(t, err, "Get Nodes for PartitionNodes Handler request failed")
resp = &MockResponseWriter{}
getPartitionNodes(resp, req)
assertParamsMissing(t, resp)
// Test specific node
req, err = http.NewRequest("GET", "/ws/v1/partition/default/node/node-1", strings.NewReader(""))
req = req.WithContext(context.WithValue(req.Context(), httprouter.ParamsKey, httprouter.Params{httprouter.Param{Key: "node", Value: "node-1"}}))
assert.NilError(t, err, "Get Node for PartitionNode Handler request failed")
resp = &MockResponseWriter{}
getPartitionNode(resp, req)
// Test node id is missing
req, err = http.NewRequest("GET", "/ws/v1/partition/default/node/node-1", strings.NewReader(""))
req = req.WithContext(context.WithValue(req.Context(), httprouter.ParamsKey, httprouter.Params{httprouter.Param{Key: "partition", Value: "default"}, httprouter.Param{Key: "node", Value: ""}}))
assert.NilError(t, err, "Get Node for PartitionNode Handler request failed")
resp = &MockResponseWriter{}
getPartitionNode(resp, req)
assertNodeIDNotExists(t, resp)
}
// addApp Add app to the given partition and assert the app count, state etc
func addApp(t *testing.T, id string, part *scheduler.PartitionContext, queueName string, isCompleted bool) *objects.Application {
return addAppWithUserGroup(t, id, part, queueName, isCompleted, security.UserGroup{})
}
// addApp Add app to the given partition and assert the app count, state etc
func addAppWithUserGroup(t *testing.T, id string, part *scheduler.PartitionContext, queueName string, isCompleted bool, userGroup security.UserGroup) *objects.Application {
initSize := len(part.GetApplications())
app := newApplication(id, part.Name, queueName, rmID, userGroup)
err := part.AddApplication(app)
assert.NilError(t, err, "Failed to add Application to Partition.")
assert.Equal(t, app.CurrentState(), objects.New.String())
assert.Equal(t, 1+initSize, len(part.GetApplications()))
if isCompleted {
app.SetState(objects.Completing.String())
currentCount := len(part.GetCompletedApplications())
err = app.HandleApplicationEvent(objects.CompleteApplication)
assert.NilError(t, err, "The app should have completed")
err = common.WaitFor(10*time.Millisecond, time.Second, func() bool {
newCount := len(part.GetCompletedApplications())
return newCount == currentCount+1
})
assert.NilError(t, err, "the completed application should have been processed")
}
return app
}
func TestGetQueueApplicationsHandler(t *testing.T) {
part := setup(t, configDefault, 1)
// add an application
app := addApp(t, "app-1", part, "root.default", false)
// add placeholder to test PlaceholderDAOInfo
tg := "tg-1"
res := &si.Resource{
Resources: map[string]*si.Quantity{"vcore": {Value: 1}},
}
ask := objects.NewAllocationAskFromSI(&si.AllocationAsk{
ApplicationID: "app-1",
PartitionName: part.Name,
TaskGroupName: tg,
ResourceAsk: res,
Placeholder: true})
err := app.AddAllocationAsk(ask)
assert.NilError(t, err, "ask should have been added to app")
app.SetTimedOutPlaceholder(tg, 1)
NewWebApp(schedulerContext, nil)
var req *http.Request
req, err = http.NewRequest("GET", "/ws/v1/partition/default/queue/root.default/applications", strings.NewReader(""))
req = req.WithContext(context.WithValue(req.Context(), httprouter.ParamsKey, httprouter.Params{
httprouter.Param{Key: "partition", Value: partitionNameWithoutClusterID},
httprouter.Param{Key: "queue", Value: "root.default"},
}))
assert.NilError(t, err, "Get Queue Applications Handler request failed")
resp := &MockResponseWriter{}
var appsDao []*dao.ApplicationDAOInfo
getQueueApplications(resp, req)
err = json.Unmarshal(resp.outputBytes, &appsDao)
assert.NilError(t, err, unmarshalError)
assert.Equal(t, len(appsDao), 1)
if !appsDao[0].HasReserved {
assert.Equal(t, len(appsDao[0].Reservations), 0)
} else {
assert.Check(t, len(appsDao[0].Reservations) > 0, "app should have at least 1 reservation")
}
// check PlaceholderData
assert.Equal(t, len(appsDao[0].PlaceholderData), 1)
assert.Equal(t, appsDao[0].PlaceholderData[0].TaskGroupName, tg)
assert.DeepEqual(t, appsDao[0].PlaceholderData[0].MinResource, map[string]int64{"vcore": 1})
assert.Equal(t, appsDao[0].PlaceholderData[0].Replaced, int64(0))
assert.Equal(t, appsDao[0].PlaceholderData[0].Count, int64(1))
assert.Equal(t, appsDao[0].PlaceholderData[0].TimedOut, int64(1))
// test nonexistent partition
var req1 *http.Request
req1, err = http.NewRequest("GET", "/ws/v1/partition/default/queue/root.default/applications", strings.NewReader(""))
req1 = req1.WithContext(context.WithValue(req.Context(), httprouter.ParamsKey, httprouter.Params{
httprouter.Param{Key: "partition", Value: "notexists"},
httprouter.Param{Key: "queue", Value: "root.default"},
}))
assert.NilError(t, err, "Get Queue Applications Handler request failed")
resp1 := &MockResponseWriter{}
getQueueApplications(resp1, req1)
assertPartitionNotExists(t, resp1)
// test nonexistent queue
var req2 *http.Request
req2, err = http.NewRequest("GET", "/ws/v1/partition/default/queue/root.default/applications", strings.NewReader(""))
req2 = req2.WithContext(context.WithValue(req.Context(), httprouter.ParamsKey, httprouter.Params{
httprouter.Param{Key: "partition", Value: partitionNameWithoutClusterID},
httprouter.Param{Key: "queue", Value: "notexists"},
}))
assert.NilError(t, err, "Get Queue Applications Handler request failed")
resp2 := &MockResponseWriter{}
getQueueApplications(resp2, req2)
assertQueueNotExists(t, resp2)
// test queue without applications
var req3 *http.Request
req3, err = http.NewRequest("GET", "/ws/v1/partition/default/queue/root.noapps/applications", strings.NewReader(""))
req3 = req3.WithContext(context.WithValue(req.Context(), httprouter.ParamsKey, httprouter.Params{
httprouter.Param{Key: "partition", Value: partitionNameWithoutClusterID},
httprouter.Param{Key: "queue", Value: "root.noapps"},
}))
assert.NilError(t, err, "Get Queue Applications Handler request failed")
resp3 := &MockResponseWriter{}
var appsDao3 []*dao.ApplicationDAOInfo
getQueueApplications(resp3, req3)
err = json.Unmarshal(resp3.outputBytes, &appsDao3)
assert.NilError(t, err, unmarshalError)
assert.Equal(t, len(appsDao3), 0)
// test missing params name
req, err = http.NewRequest("GET", "/ws/v1/partition/default/queue/root.default/applications", strings.NewReader(""))
assert.NilError(t, err, "Get Queue Applications Handler request failed")
resp = &MockResponseWriter{}
getQueueApplications(resp, req)
assertParamsMissing(t, resp)
}
func checkLegalGetAppsRequest(t *testing.T, url string, params httprouter.Params, expected []*dao.ApplicationDAOInfo) {
req, err := http.NewRequest("GET", url, strings.NewReader(""))
req = req.WithContext(context.WithValue(req.Context(), httprouter.ParamsKey, params))
assert.NilError(t, err)
resp := &MockResponseWriter{}
var appsDao []*dao.ApplicationDAOInfo
getPartitionApplicationsByState(resp, req)
err = json.Unmarshal(resp.outputBytes, &appsDao)
assert.NilError(t, err, unmarshalError)
assert.Equal(t, len(appsDao), len(expected))
}
func checkIllegalGetAppsRequest(t *testing.T, url string, params httprouter.Params, assertFunc func(t *testing.T, resp *MockResponseWriter)) {
req, err := http.NewRequest("GET", url, strings.NewReader(""))
req = req.WithContext(context.WithValue(req.Context(), httprouter.ParamsKey, params))
assert.NilError(t, err)
resp := &MockResponseWriter{}
getPartitionApplicationsByState(resp, req)
assertFunc(t, resp)
}
func TestGetPartitionApplicationsByStateHandler(t *testing.T) {
defaultPartition := setup(t, configDefault, 1)
NewWebApp(schedulerContext, nil)
// add a new application
app1 := addApp(t, "app-1", defaultPartition, "root.default", false)
app1.SetState(objects.New.String())
// add a running application
app2 := addApp(t, "app-2", defaultPartition, "root.default", false)
app2.SetState(objects.Running.String())
// add a completed application
app3 := addApp(t, "app-3", defaultPartition, "root.default", true)
// add a rejected application
app4 := newApplication("app-4", defaultPartition.Name, "root.default", rmID, security.UserGroup{})
rejectedMessage := fmt.Sprintf("Failed to place application %s: application rejected: no placement rule matched", app3.ApplicationID)
defaultPartition.AddRejectedApplication(app3, rejectedMessage)
// test get active app
expectedActiveDao := []*dao.ApplicationDAOInfo{getApplicationDAO(app1), getApplicationDAO(app2)}
checkLegalGetAppsRequest(t, "/ws/v1/partition/default/applications/Active", httprouter.Params{
httprouter.Param{Key: "partition", Value: partitionNameWithoutClusterID},
httprouter.Param{Key: "state", Value: "Active"}}, expectedActiveDao)
// test get active app with running state
expectedRunningDao := []*dao.ApplicationDAOInfo{getApplicationDAO(app2)}
checkLegalGetAppsRequest(t, "/ws/v1/partition/default/applications/Active?status=Running", httprouter.Params{
httprouter.Param{Key: "partition", Value: partitionNameWithoutClusterID},
httprouter.Param{Key: "state", Value: "Active"}}, expectedRunningDao)
// test get completed app
expectedCompletedDao := []*dao.ApplicationDAOInfo{getApplicationDAO(app3)}
checkLegalGetAppsRequest(t, "/ws/v1/partition/default/applications/Completed", httprouter.Params{
httprouter.Param{Key: "partition", Value: partitionNameWithoutClusterID},
httprouter.Param{Key: "state", Value: "Completed"}}, expectedCompletedDao)
// test get rejected app
expectedRejectedDao := []*dao.ApplicationDAOInfo{getApplicationDAO(app4)}
checkLegalGetAppsRequest(t, "/ws/v1/partition/default/applications/Rejected", httprouter.Params{
httprouter.Param{Key: "partition", Value: partitionNameWithoutClusterID},
httprouter.Param{Key: "state", Value: "Rejected"}}, expectedRejectedDao)
// test nonexistent partition
checkIllegalGetAppsRequest(t, "/ws/v1/partition/default/applications/Active", httprouter.Params{
httprouter.Param{Key: "partition", Value: "notexists"},
httprouter.Param{Key: "state", Value: "Active"}}, assertPartitionNotExists)
// test disallow state
checkIllegalGetAppsRequest(t, "/ws/v1/partition/default/applications/Accepted", httprouter.Params{
httprouter.Param{Key: "partition", Value: partitionNameWithoutClusterID},
httprouter.Param{Key: "state", Value: "Accepted"}}, assertAppStateNotAllow)
// test disallow active state
checkIllegalGetAppsRequest(t, "/ws/v1/partition/default/applications/Active?status=invalid", httprouter.Params{
httprouter.Param{Key: "partition", Value: partitionNameWithoutClusterID},
httprouter.Param{Key: "state", Value: "Active"}}, assertActiveStateNotAllow)
// test missing params name
checkIllegalGetAppsRequest(t, "/ws/v1/partition/default/applications/Active", nil, assertParamsMissing)
}
func TestGetApplicationHandler(t *testing.T) {
part := setup(t, configDefault, 1)
// add 1 application
app := addApp(t, "app-1", part, "root.default", false)
res := &si.Resource{
Resources: map[string]*si.Quantity{"vcore": {Value: 1}},
}
ask := objects.NewAllocationAskFromSI(&si.AllocationAsk{
ApplicationID: "app-1",
PartitionName: part.Name,
ResourceAsk: res})
err := app.AddAllocationAsk(ask)
assert.NilError(t, err, "ask should have been added to app")
NewWebApp(schedulerContext, nil)
var req *http.Request
req, err = http.NewRequest("GET", "/ws/v1/partition/default/queue/root.default/application/app-1", strings.NewReader(""))
req = req.WithContext(context.WithValue(req.Context(), httprouter.ParamsKey, httprouter.Params{
httprouter.Param{Key: "partition", Value: partitionNameWithoutClusterID},
httprouter.Param{Key: "queue", Value: "root.default"},
httprouter.Param{Key: "application", Value: "app-1"},
}))
assert.NilError(t, err, "Get Application Handler request failed")
resp := &MockResponseWriter{}
var appsDao *dao.ApplicationDAOInfo
getApplication(resp, req)
err = json.Unmarshal(resp.outputBytes, &appsDao)
assert.NilError(t, err, unmarshalError)
if !appsDao.HasReserved {
assert.Equal(t, len(appsDao.Reservations), 0)
} else {
assert.Check(t, len(appsDao.Reservations) > 0, "app should have at least 1 reservation")
}
// test nonexistent partition
var req1 *http.Request
req1, err = http.NewRequest("GET", "/ws/v1/partition/default/queue/root.default/application/app-1", strings.NewReader(""))
req1 = req1.WithContext(context.WithValue(req.Context(), httprouter.ParamsKey, httprouter.Params{
httprouter.Param{Key: "partition", Value: "notexists"},
httprouter.Param{Key: "queue", Value: "root.default"},
httprouter.Param{Key: "application", Value: "app-1"},
}))
assert.NilError(t, err, "Get Application Handler request failed")
resp1 := &MockResponseWriter{}
getApplication(resp1, req1)
assertPartitionNotExists(t, resp1)
// test nonexistent queue
var req2 *http.Request
req2, err = http.NewRequest("GET", "/ws/v1/partition/default/queue/root.default/application/app-1", strings.NewReader(""))
req2 = req2.WithContext(context.WithValue(req.Context(), httprouter.ParamsKey, httprouter.Params{
httprouter.Param{Key: "partition", Value: partitionNameWithoutClusterID},
httprouter.Param{Key: "queue", Value: "notexists"},
httprouter.Param{Key: "application", Value: "app-1"},
}))
assert.NilError(t, err, "Get Application Handler request failed")
resp2 := &MockResponseWriter{}
getApplication(resp2, req2)
assertQueueNotExists(t, resp2)
// test nonexistent application
var req3 *http.Request
req3, err = http.NewRequest("GET", "/ws/v1/partition/default/queue/root.noapps/application/app-1", strings.NewReader(""))
req3 = req3.WithContext(context.WithValue(req.Context(), httprouter.ParamsKey, httprouter.Params{
httprouter.Param{Key: "partition", Value: partitionNameWithoutClusterID},
httprouter.Param{Key: "queue", Value: "root.noapps"},
httprouter.Param{Key: "application", Value: "app-1"},
}))
assert.NilError(t, err, "Get Application Handler request failed")
resp3 := &MockResponseWriter{}
getApplication(resp3, req3)
assertApplicationNotExists(t, resp3)
// test without queue
var req4 *http.Request
req4, err = http.NewRequest("GET", "/ws/v1/partition/default/application/app-1", strings.NewReader(""))
req4 = req4.WithContext(context.WithValue(req.Context(), httprouter.ParamsKey, httprouter.Params{
httprouter.Param{Key: "partition", Value: partitionNameWithoutClusterID},
httprouter.Param{Key: "application", Value: "app-1"},
}))
assert.NilError(t, err, "Get Application Handler request failed")
resp4 := &MockResponseWriter{}
var appsDao4 *dao.ApplicationDAOInfo
getApplication(resp4, req4)
err = json.Unmarshal(resp4.outputBytes, &appsDao4)
assert.NilError(t, err, unmarshalError)
// test invalid queue name
var req5 *http.Request
req5, err = http.NewRequest("GET", "/ws/v1/partition/default/queue/root.default/application/app-1", strings.NewReader(""))
req5 = req5.WithContext(context.WithValue(req.Context(), httprouter.ParamsKey, httprouter.Params{
httprouter.Param{Key: "partition", Value: partitionNameWithoutClusterID},
httprouter.Param{Key: "queue", Value: "root.test.test123@"},
httprouter.Param{Key: "application", Value: "app-1"},
}))
assert.NilError(t, err, "Get Application Handler request failed")
resp5 := &MockResponseWriter{}
getApplication(resp5, req5)
assertQueueInvalid(t, resp5, "root.test.test123@", "test123@")
// test missing params name
req, err = http.NewRequest("GET", "/ws/v1/partition/default/queue/root.default/application/app-1", strings.NewReader(""))
assert.NilError(t, err, "Get Application Handler request failed")
resp = &MockResponseWriter{}
getApplication(resp, req)
assertParamsMissing(t, resp)
}
func assertParamsMissing(t *testing.T, resp *MockResponseWriter) {
var errInfo dao.YAPIError
err := json.Unmarshal(resp.outputBytes, &errInfo)
assert.NilError(t, err, unmarshalError)
assert.Equal(t, http.StatusBadRequest, resp.statusCode, statusCodeError)
assert.Equal(t, errInfo.Message, MissingParamsName, jsonMessageError)
assert.Equal(t, errInfo.StatusCode, http.StatusBadRequest)
}
func assertPartitionNotExists(t *testing.T, resp *MockResponseWriter) {
var errInfo dao.YAPIError
err := json.Unmarshal(resp.outputBytes, &errInfo)
assert.NilError(t, err, unmarshalError)
assert.Equal(t, http.StatusNotFound, resp.statusCode, statusCodeError)
assert.Equal(t, errInfo.Message, PartitionDoesNotExists, jsonMessageError)
assert.Equal(t, errInfo.StatusCode, http.StatusNotFound)
}
func assertQueueNotExists(t *testing.T, resp *MockResponseWriter) {
var errInfo dao.YAPIError
err := json.Unmarshal(resp.outputBytes, &errInfo)
assert.NilError(t, err, unmarshalError)
assert.Equal(t, http.StatusNotFound, resp.statusCode, statusCodeError)
assert.Equal(t, errInfo.Message, QueueDoesNotExists, jsonMessageError)
assert.Equal(t, errInfo.StatusCode, http.StatusNotFound)
}
func assertQueueInvalid(t *testing.T, resp *MockResponseWriter, invalidQueuePath string, invalidQueueName string) {
var errInfo dao.YAPIError
err := json.Unmarshal(resp.outputBytes, &errInfo)
assert.NilError(t, err, unmarshalError)
assert.Equal(t, http.StatusBadRequest, resp.statusCode, statusCodeError)
assert.Equal(t, errInfo.Message, "problem in queue query parameter parsing as queue param "+invalidQueuePath+" contains invalid queue name "+invalidQueueName+". Queue name must only have alphanumeric characters, - or _, and be no longer than 64 characters except the recovery queue root.@recovery@", jsonMessageError)
assert.Equal(t, errInfo.StatusCode, http.StatusBadRequest)
}
func assertApplicationNotExists(t *testing.T, resp *MockResponseWriter) {
var errInfo dao.YAPIError
err := json.Unmarshal(resp.outputBytes, &errInfo)
assert.NilError(t, err, unmarshalError)
assert.Equal(t, http.StatusNotFound, resp.statusCode, statusCodeError)
assert.Equal(t, errInfo.Message, ApplicationDoesNotExists, jsonMessageError)
assert.Equal(t, errInfo.StatusCode, http.StatusNotFound)
}
func assertUserExists(t *testing.T, resp *MockResponseWriter, expected *dao.UserResourceUsageDAOInfo) {
var actual *dao.UserResourceUsageDAOInfo
err := json.Unmarshal(resp.outputBytes, &actual)
assert.NilError(t, err, unmarshalError)
assert.DeepEqual(t, actual, expected)
}
func assertUserNotExists(t *testing.T, resp *MockResponseWriter) {
var errInfo dao.YAPIError
err := json.Unmarshal(resp.outputBytes, &errInfo)
assert.NilError(t, err, unmarshalError)
assert.Equal(t, http.StatusNotFound, resp.statusCode, statusCodeError)
assert.Equal(t, errInfo.Message, UserDoesNotExists, jsonMessageError)
assert.Equal(t, errInfo.StatusCode, http.StatusNotFound)
}
func assertUserNameMissing(t *testing.T, resp *MockResponseWriter) {
var errInfo dao.YAPIError
err := json.Unmarshal(resp.outputBytes, &errInfo)
assert.NilError(t, err, unmarshalError)
assert.Equal(t, http.StatusBadRequest, resp.statusCode, statusCodeError)
assert.Equal(t, errInfo.Message, UserNameMissing, jsonMessageError)
assert.Equal(t, errInfo.StatusCode, http.StatusBadRequest)
}
func assertGroupExists(t *testing.T, resp *MockResponseWriter, expected *dao.GroupResourceUsageDAOInfo) {
var actual *dao.GroupResourceUsageDAOInfo
err := json.Unmarshal(resp.outputBytes, &actual)
assert.NilError(t, err, unmarshalError)
assert.DeepEqual(t, actual, expected)
}
func assertGroupNotExists(t *testing.T, resp *MockResponseWriter) {
var errInfo dao.YAPIError
err := json.Unmarshal(resp.outputBytes, &errInfo)
assert.NilError(t, err, unmarshalError)
assert.Equal(t, http.StatusNotFound, resp.statusCode, statusCodeError)
assert.Equal(t, errInfo.Message, GroupDoesNotExists, jsonMessageError)
assert.Equal(t, errInfo.StatusCode, http.StatusNotFound)
}
func assertGroupNameMissing(t *testing.T, resp *MockResponseWriter) {
var errInfo dao.YAPIError
err := json.Unmarshal(resp.outputBytes, &errInfo)
assert.NilError(t, err, unmarshalError)
assert.Equal(t, http.StatusBadRequest, resp.statusCode, statusCodeError)
assert.Equal(t, errInfo.Message, GroupNameMissing, jsonMessageError)
assert.Equal(t, errInfo.StatusCode, http.StatusBadRequest)
}
func assertNodeIDNotExists(t *testing.T, resp *MockResponseWriter) {
var errInfo dao.YAPIError
err := json.Unmarshal(resp.outputBytes, &errInfo)
assert.NilError(t, err, unmarshalError)
assert.Equal(t, http.StatusNotFound, resp.statusCode, statusCodeError)
assert.Equal(t, errInfo.Message, NodeDoesNotExists, jsonMessageError)
assert.Equal(t, errInfo.StatusCode, http.StatusNotFound)
}
func assertAppStateNotAllow(t *testing.T, resp *MockResponseWriter) {
var errInfo dao.YAPIError
err := json.Unmarshal(resp.outputBytes, &errInfo)
assert.NilError(t, err, unmarshalError)
assert.Equal(t, http.StatusBadRequest, resp.statusCode, statusCodeError)
assert.Equal(t, errInfo.Message, "Only following application states are allowed: active, rejected, completed", jsonMessageError)
assert.Equal(t, errInfo.StatusCode, http.StatusBadRequest)
}
func assertActiveStateNotAllow(t *testing.T, resp *MockResponseWriter) {
var errInfo dao.YAPIError
err := json.Unmarshal(resp.outputBytes, &errInfo)
assert.NilError(t, err, unmarshalError)
assert.Equal(t, http.StatusBadRequest, resp.statusCode, statusCodeError)
assert.Equal(t, errInfo.Message, allowedActiveStatusMsg, jsonMessageError)
assert.Equal(t, errInfo.StatusCode, http.StatusBadRequest)
}
func TestValidateQueue(t *testing.T) {
err := validateQueue("root.test.test123")
assert.NilError(t, err, "Queue path is correct but still throwing error.")
invalidQueuePath := "root.test.test123@"
invalidQueueName := "test123@"
err1 := validateQueue(invalidQueuePath)
assert.Error(t, err1, "problem in queue query parameter parsing as queue param "+invalidQueuePath+" contains invalid queue name "+invalidQueueName+". Queue name must only have alphanumeric characters, - or _, and be no longer than 64 characters except the recovery queue root.@recovery@")
err2 := validateQueue("root")
assert.NilError(t, err2, "Queue path is correct but still throwing error.")
err3 := validateQueue("root.@recovery@")
assert.NilError(t, err3, "Queue path is correct but still throwing error.")
}
func TestFullStateDumpPath(t *testing.T) {
original := configs.GetConfigMap()
defer func() {
configs.SetConfigMap(original)
}()
configMap := map[string]string{
"log.level": "WARN",
}
configs.SetConfigMap(configMap)
schedulerContext = prepareSchedulerContext(t)
partitionContext := schedulerContext.GetPartitionMapClone()
context := partitionContext[normalizedPartitionName]
app := newApplication("appID", normalizedPartitionName, "root.default", rmID, security.UserGroup{})
err := context.AddApplication(app)
assert.NilError(t, err, "failed to add Application to partition")
imHistory = history.NewInternalMetricsHistory(5)
req, err2 := http.NewRequest("GET", "/ws/v1/getfullstatedump", strings.NewReader(""))
assert.NilError(t, err2)
resp := &MockResponseWriter{}
getFullStateDump(resp, req)
statusCode := resp.statusCode
assert.Check(t, statusCode != http.StatusInternalServerError, "response status code")
var aggregated AggregatedStateInfo
err = json.Unmarshal(resp.outputBytes, &aggregated)
assert.NilError(t, err)
verifyStateDumpJSON(t, &aggregated)
}
func TestSpecificUserAndGroupResourceUsage(t *testing.T) {
prepareUserAndGroupContext(t, groupsLimitsConfig)
// Test user name is missing
req, err := http.NewRequest("GET", "/ws/v1/partition/default/usage/user/", strings.NewReader(""))
req = req.WithContext(context.WithValue(req.Context(), httprouter.ParamsKey, httprouter.Params{
httprouter.Param{Key: "group", Value: "testgroup"},
}))
assert.NilError(t, err, "Get User Resource Usage Handler request failed")
resp := &MockResponseWriter{}
getUserResourceUsage(resp, req)
assertUserNameMissing(t, resp)
// Test group name is missing
req, err = http.NewRequest("GET", "/ws/v1/partition/default/usage/group/", strings.NewReader(""))
req = req.WithContext(context.WithValue(req.Context(), httprouter.ParamsKey, httprouter.Params{
httprouter.Param{Key: "user", Value: "testuser"},
httprouter.Param{Key: "group", Value: ""},
}))
assert.NilError(t, err, "Get Group Resource Usage Handler request failed")
resp = &MockResponseWriter{}
getGroupResourceUsage(resp, req)
assertGroupNameMissing(t, resp)
// Test existed user query
req, err = http.NewRequest("GET", "/ws/v1/partition/default/usage/user/", strings.NewReader(""))
req = req.WithContext(context.WithValue(req.Context(), httprouter.ParamsKey, httprouter.Params{
httprouter.Param{Key: "user", Value: "testuser"},
httprouter.Param{Key: "group", Value: "testgroup"},
}))
assert.NilError(t, err, "Get User Resource Usage Handler request failed")
resp = &MockResponseWriter{}
getUserResourceUsage(resp, req)
assertUserExists(t, resp,
&dao.UserResourceUsageDAOInfo{
UserName: "testuser",
Groups: map[string]string{"app-1": "testgroup"},
Queues: &dao.ResourceUsageDAOInfo{
QueuePath: "root",
ResourceUsage: resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 1}),
RunningApplications: []string{"app-1"},
Children: []*dao.ResourceUsageDAOInfo{
{
QueuePath: "root.default",
ResourceUsage: resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 1}),
RunningApplications: []string{"app-1"},
},
},
},
})
// Test non-existing user query
req, err = http.NewRequest("GET", "/ws/v1/partition/default/usage/user/", strings.NewReader(""))
req = req.WithContext(context.WithValue(req.Context(), httprouter.ParamsKey, httprouter.Params{
httprouter.Param{Key: "user", Value: "testNonExistingUser"},
httprouter.Param{Key: "group", Value: "testgroup"},
}))
assert.NilError(t, err, "Get User Resource Usage Handler request failed")
resp = &MockResponseWriter{}
getUserResourceUsage(resp, req)
assertUserNotExists(t, resp)
// Test existed group query
req, err = http.NewRequest("GET", "/ws/v1/partition/default/usage/group/", strings.NewReader(""))
assert.NilError(t, err, "Get Group Resource Usage Handler request failed")
req = req.WithContext(context.WithValue(req.Context(), httprouter.ParamsKey, httprouter.Params{
httprouter.Param{Key: "user", Value: "testuser"},
httprouter.Param{Key: "group", Value: "testgroup"},
}))
resp = &MockResponseWriter{}
getGroupResourceUsage(resp, req)
assertGroupExists(t, resp,
&dao.GroupResourceUsageDAOInfo{
GroupName: "testgroup",
Applications: []string{"app-1"},
Queues: &dao.ResourceUsageDAOInfo{
QueuePath: "root",
ResourceUsage: resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 1}),
RunningApplications: []string{"app-1"},
Children: []*dao.ResourceUsageDAOInfo{
{
QueuePath: "root.default",
ResourceUsage: resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 1}),
MaxResources: resources.NewResourceFromMap(map[string]resources.Quantity{"cpu": 200}),
RunningApplications: []string{"app-1"},
},
},
},
})
// Test non-existing group query
req, err = http.NewRequest("GET", "/ws/v1/partition/default/usage/group/", strings.NewReader(""))
assert.NilError(t, err, "Get Group Resource Usage Handler request failed")
req = req.WithContext(context.WithValue(req.Context(), httprouter.ParamsKey, httprouter.Params{
httprouter.Param{Key: "user", Value: "testuser"},
httprouter.Param{Key: "group", Value: "testNonExistingGroup"},
}))
resp = &MockResponseWriter{}
getGroupResourceUsage(resp, req)
assertGroupNotExists(t, resp)
// Test params name missing
req, err = http.NewRequest("GET", "/ws/v1/partition/default/usage/group/", strings.NewReader(""))
assert.NilError(t, err, "Get Group Resource Usage Handler request failed")
resp = &MockResponseWriter{}
getGroupResourceUsage(resp, req)
assertParamsMissing(t, resp)
}
func TestUsersAndGroupsResourceUsage(t *testing.T) {
prepareUserAndGroupContext(t, groupsLimitsConfig)
var req *http.Request
req, err := http.NewRequest("GET", "/ws/v1/partition/default/usage/users", strings.NewReader(""))
assert.NilError(t, err, "Get Users Resource Usage Handler request failed")
resp := &MockResponseWriter{}
var usersResourceUsageDao []*dao.UserResourceUsageDAOInfo
getUsersResourceUsage(resp, req)
err = json.Unmarshal(resp.outputBytes, &usersResourceUsageDao)
assert.NilError(t, err, unmarshalError)
assert.Equal(t, usersResourceUsageDao[0].Queues.ResourceUsage.String(),
resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.CPU: 1}).String())
// Assert existing users
assert.Equal(t, len(usersResourceUsageDao), 1)
assert.Equal(t, usersResourceUsageDao[0].UserName, "testuser")
req, err = http.NewRequest("GET", "/ws/v1/partition/default/usage/groups", strings.NewReader(""))
assert.NilError(t, err, "Get Groups Resource Usage Handler request failed")
var groupsResourceUsageDao []*dao.GroupResourceUsageDAOInfo
resp = &MockResponseWriter{}
getGroupsResourceUsage(resp, req)
err = json.Unmarshal(resp.outputBytes, &groupsResourceUsageDao)
assert.NilError(t, err, unmarshalError)
assert.Equal(t, groupsResourceUsageDao[0].Queues.ResourceUsage.String(),
resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.CPU: 1}).String())
// Assert existing groups
assert.Equal(t, len(groupsResourceUsageDao), 1)
assert.Equal(t, groupsResourceUsageDao[0].GroupName, "testgroup")
// test empty user group
prepareEmptyUserGroupContext()
req, err = http.NewRequest("GET", "/ws/v1/partition/default/usage/users", strings.NewReader(""))
assert.NilError(t, err, "Get Users Resource Usage Handler request failed")
resp = &MockResponseWriter{}
getUsersResourceUsage(resp, req)
var userResourceUsageDao []*dao.UserResourceUsageDAOInfo
err = json.Unmarshal(resp.outputBytes, &userResourceUsageDao)
assert.NilError(t, err, unmarshalError)
assert.DeepEqual(t, userResourceUsageDao, []*dao.UserResourceUsageDAOInfo{})
req, err = http.NewRequest("GET", "/ws/v1/partition/default/usage/groups", strings.NewReader(""))
assert.NilError(t, err, "Get Groups Resource Usage Handler request failed")
resp = &MockResponseWriter{}
getGroupsResourceUsage(resp, req)
var groupResourceUsageDao []*dao.GroupResourceUsageDAOInfo
err = json.Unmarshal(resp.outputBytes, &groupResourceUsageDao)
assert.NilError(t, err, unmarshalError)
assert.DeepEqual(t, groupResourceUsageDao, []*dao.GroupResourceUsageDAOInfo{})
}
func TestGetEvents(t *testing.T) {
prepareSchedulerContext(t)
appEvent, nodeEvent, queueEvent := addEvents(t)
checkAllEvents(t, []*si.EventRecord{appEvent, nodeEvent, queueEvent})
checkSingleEvent(t, appEvent, "count=1")
checkSingleEvent(t, queueEvent, "start=2")
// illegal requests
checkIllegalBatchRequest(t, "count=xyz", `strconv.ParseUint: parsing "xyz": invalid syntax`)
checkIllegalBatchRequest(t, "count=-100", `strconv.ParseUint: parsing "-100": invalid syntax`)
checkIllegalBatchRequest(t, "count=0", `0 is not a valid value for "count`)
checkIllegalBatchRequest(t, "start=xyz", `strconv.ParseUint: parsing "xyz": invalid syntax`)
checkIllegalBatchRequest(t, "start=-100", `strconv.ParseUint: parsing "-100": invalid syntax`)
}
func TestGetEventsWhenTrackingDisabled(t *testing.T) {
original := configs.GetConfigMap()
defer func() {
ev := events.GetEventSystem().(*events.EventSystemImpl) //nolint:errcheck
ev.Stop()
configs.SetConfigMap(original)
}()
configMap := map[string]string{
configs.CMEventTrackingEnabled: "false",
}
configs.SetConfigMap(configMap)
events.Init()
ev := events.GetEventSystem().(*events.EventSystemImpl) //nolint:errcheck
ev.StartServiceWithPublisher(false)
req, err := http.NewRequest("GET", "/ws/v1/events/batch", strings.NewReader(""))
assert.NilError(t, err)
readIllegalRequest(t, req, http.StatusInternalServerError, "Event tracking is disabled")
}
func TestGetStream(t *testing.T) {
setup(t, configDefault, 1)
ev, req := initEventsAndCreateRequest(t)
defer ev.Stop()
cancelCtx, cancel := context.WithCancel(context.Background())
defer cancel()
req = req.Clone(cancelCtx)
resp := NewResponseRecorderWithDeadline() // MockResponseWriter does not implement http.Flusher
go func() {
time.Sleep(200 * time.Millisecond)
ev.AddEvent(&si.EventRecord{
TimestampNano: 111,
ObjectID: "app-1",
})
ev.AddEvent(&si.EventRecord{
TimestampNano: 222,
ObjectID: "node-1",
})
ev.AddEvent(&si.EventRecord{
TimestampNano: 333,
ObjectID: "app-2",
})
time.Sleep(200 * time.Millisecond)
cancel()
}()
getStream(resp, req)
output := make([]byte, 256)
n, err := resp.Body.Read(output)
assert.NilError(t, err, "cannot read response body")
lines := strings.Split(string(output[:n]), "\n")
assertInstanceUUID(t, lines[0])
assertEvent(t, lines[1], 111, "app-1")
assertEvent(t, lines[2], 222, "node-1")
assertEvent(t, lines[3], 333, "app-2")
}
func TestGetStream_StreamClosedByProducer(t *testing.T) {
ev, req := initEventsAndCreateRequest(t)
defer ev.Stop()
resp := NewResponseRecorderWithDeadline() // MockResponseWriter does not implement http.Flusher
go func() {
time.Sleep(200 * time.Millisecond)
ev.AddEvent(&si.EventRecord{
TimestampNano: 111,
ObjectID: "app-1",
})
time.Sleep(100 * time.Millisecond)
ev.CloseAllStreams()
}()
getStream(resp, req)
output := make([]byte, 256)
n, err := resp.Body.Read(output)
assert.Equal(t, http.StatusOK, resp.Code)
assert.NilError(t, err, "cannot read response body")
lines := strings.Split(string(output[:n]), "\n")
assertInstanceUUID(t, lines[0])
assertEvent(t, lines[1], 111, "app-1")
assertYunikornError(t, lines[2], "Event stream was closed by the producer")
}
func TestGetStream_NotFlusherImpl(t *testing.T) {
var req *http.Request
req, err := http.NewRequest("GET", "/ws/v1/events/stream", strings.NewReader(""))
assert.NilError(t, err)
resp := &MockResponseWriter{}
getStream(resp, req)
assert.Assert(t, strings.Contains(string(resp.outputBytes), "Writer does not implement http.Flusher"))
assert.Equal(t, http.StatusInternalServerError, resp.statusCode)
}
func TestGetStream_Count(t *testing.T) {
setup(t, configDefault, 1)
ev, req := initEventsAndCreateRequest(t)
defer ev.Stop()
cancelCtx, cancel := context.WithCancel(context.Background())
defer cancel()
req = req.Clone(cancelCtx)
resp := NewResponseRecorderWithDeadline() // MockResponseWriter does not implement http.Flusher
// add some existing events
ev.AddEvent(&si.EventRecord{TimestampNano: 0})
ev.AddEvent(&si.EventRecord{TimestampNano: 1})
ev.AddEvent(&si.EventRecord{TimestampNano: 2})
time.Sleep(100 * time.Millisecond) // let the events propagate
// case #1: "count" not set
go func() {
time.Sleep(100 * time.Millisecond)
cancel()
}()
getStream(resp, req)
output := make([]byte, 256)
n, err := resp.Body.Read(output)
lines := strings.Split(string(output[:n]), "\n")
assertInstanceUUID(t, lines[0])
// case #2: "count" is set to "2"
req, err = http.NewRequest("GET", "/ws/v1/events/stream", strings.NewReader(""))
assert.NilError(t, err)
cancelCtx, cancel = context.WithCancel(context.Background())
req = req.Clone(cancelCtx)
defer cancel()
req.URL.RawQuery = "count=2"
go func() {
time.Sleep(100 * time.Millisecond)
cancel()
}()
getStream(resp, req)
output = make([]byte, 256)
n, err = resp.Body.Read(output)
assert.NilError(t, err)
lines = strings.Split(string(output[:n]), "\n")
assertInstanceUUID(t, lines[0])
assertEvent(t, lines[1], 1, "")
assertEvent(t, lines[2], 2, "")
// case #3: illegal value
req, err = http.NewRequest("GET", "/ws/v1/events/stream", strings.NewReader(""))
assert.NilError(t, err)
cancelCtx, cancel = context.WithCancel(context.Background())
req = req.Clone(cancelCtx)
defer cancel()
req.URL.RawQuery = "count=xyz"
getStream(resp, req)
output = make([]byte, 256)
n, err = resp.Body.Read(output)
assert.NilError(t, err)
line := string(output[:n])
assertYunikornError(t, line, `strconv.ParseUint: parsing "xyz": invalid syntax`)
}
func TestGetStream_TrackingDisabled(t *testing.T) {
original := configs.GetConfigMap()
defer func() {
ev := events.GetEventSystem().(*events.EventSystemImpl) //nolint:errcheck
ev.Stop()
configs.SetConfigMap(original)
}()
configMap := map[string]string{
configs.CMEventTrackingEnabled: "false",
}
configs.SetConfigMap(configMap)
_, req := initEventsAndCreateRequest(t)
resp := httptest.NewRecorder()
assertGetStreamError(t, false, req, resp, http.StatusInternalServerError, "Event tracking is disabled")
}
func TestGetStream_NoWriteDeadline(t *testing.T) {
ev, req := initEventsAndCreateRequest(t)
defer ev.Stop()
resp := httptest.NewRecorder() // does not have SetWriteDeadline()
assertGetStreamError(t, false, req, resp, http.StatusInternalServerError, "Cannot set write deadline: feature not supported")
}
func TestGetStream_SetWriteDeadlineFails(t *testing.T) {
setup(t, configDefault, 1)
ev, req := initEventsAndCreateRequest(t)
defer ev.Stop()
resp := NewResponseRecorderWithDeadline()
resp.setWriteFailsAt = 2 // only the second SetWriteDeadline() will fail
resp.setWriteFails = true
go func() {
time.Sleep(200 * time.Millisecond)
ev.AddEvent(&si.EventRecord{
TimestampNano: 111,
ObjectID: "app-1",
})
}()
getStream(resp, req)
checkGetStreamErrorResult(t, true, resp.Result(), http.StatusOK, "Cannot set write deadline: SetWriteDeadline failed")
}
func TestGetStream_SetReadDeadlineFails(t *testing.T) {
_, req := initEventsAndCreateRequest(t)
resp := NewResponseRecorderWithDeadline()
resp.setReadFails = true
assertGetStreamError(t, false, req, resp, http.StatusInternalServerError, "Cannot set read deadline: SetReadDeadline failed")
}
func TestGetStream_Limit(t *testing.T) {
current := configs.GetConfigMap()
defer func() {
configs.SetConfigMap(current)
}()
configs.SetConfigMap(map[string]string{
configs.CMMaxEventStreams: "3",
})
resp := NewResponseRecorderWithDeadline()
ev, req := initEventsAndCreateRequest(t)
defer ev.Stop()
cancelCtx, cancel := context.WithCancel(context.Background())
req = req.Clone(cancelCtx)
defer cancel()
req.Host = "host-1"
// start simulated connections in the background
go getStream(NewResponseRecorderWithDeadline(), req)
go getStream(NewResponseRecorderWithDeadline(), req)
go getStream(NewResponseRecorderWithDeadline(), req)
// wait until the StreamingLimiter.AddHost() calls
err := common.WaitFor(time.Millisecond, time.Second, func() bool {
streamingLimiter.Lock()
defer streamingLimiter.Unlock()
return streamingLimiter.streams == 3
})
assert.NilError(t, err)
assertGetStreamError(t, false, req, resp, http.StatusServiceUnavailable, "Too many streaming connections")
}
func assertGetStreamError(t *testing.T, withUUID bool, req *http.Request, resp interface{}, statusCode int, expectedMsg string) {
t.Helper()
var response *http.Response
switch rec := resp.(type) {
case *ResponseRecorderWithDeadline:
getStream(rec, req)
response = rec.Result()
case *httptest.ResponseRecorder:
getStream(rec, req)
response = rec.Result()
default:
t.Fatalf("unknown response recorder type")
}
checkGetStreamErrorResult(t, withUUID, response, statusCode, expectedMsg)
}
func checkGetStreamErrorResult(t *testing.T, withUUID bool, response *http.Response, statusCode int, expectedMsg string) {
t.Helper()
output := make([]byte, 256)
n, err := response.Body.Read(output)
assert.NilError(t, err)
if withUUID {
lines := strings.Split(string(output[:n]), "\n")
assertInstanceUUID(t, lines[0])
assertYunikornError(t, lines[1], expectedMsg)
} else {
line := string(output[:n])
assertYunikornError(t, line, expectedMsg)
}
assert.Equal(t, statusCode, response.StatusCode)
}
func initEventsAndCreateRequest(t *testing.T) (*events.EventSystemImpl, *http.Request) {
t.Helper()
events.Init()
ev := events.GetEventSystem().(*events.EventSystemImpl) //nolint:errcheck
ev.StartServiceWithPublisher(false)
var req *http.Request
req, err := http.NewRequest("GET", "/ws/v1/events/stream", strings.NewReader(""))
assert.NilError(t, err)
return ev, req
}
func assertEvent(t *testing.T, output string, tsNano int64, objectID string) {
t.Helper()
var evt si.EventRecord
err := json.Unmarshal([]byte(output), &evt)
assert.NilError(t, err)
assert.Equal(t, tsNano, evt.TimestampNano)
assert.Equal(t, objectID, evt.ObjectID)
}
func assertInstanceUUID(t *testing.T, output string) {
var id dao.YunikornID
err := json.Unmarshal([]byte(output), &id)
assert.NilError(t, err)
assert.Assert(t, id.InstanceUUID != "")
}
func assertYunikornError(t *testing.T, output, errMsg string) {
t.Helper()
var ykErr dao.YAPIError
err := json.Unmarshal([]byte(output), &ykErr)
assert.NilError(t, err)
assert.Equal(t, errMsg, ykErr.Description)
assert.Equal(t, errMsg, ykErr.Message)
}
func addEvents(t *testing.T) (appEvent, nodeEvent, queueEvent *si.EventRecord) {
t.Helper()
events.Init()
ev := events.GetEventSystem().(*events.EventSystemImpl) //nolint:errcheck
ev.StartServiceWithPublisher(false)
protoRes := resources.NewResourceFromMap(map[string]resources.Quantity{
"cpu": 1,
}).ToProto()
appEvent = &si.EventRecord{
Type: si.EventRecord_APP,
TimestampNano: 100,
Message: "app event",
EventChangeType: si.EventRecord_ADD,
EventChangeDetail: si.EventRecord_APP_ALLOC,
ObjectID: "app",
ReferenceID: "alloc",
Resource: protoRes,
}
ev.AddEvent(appEvent)
nodeEvent = &si.EventRecord{
Type: si.EventRecord_NODE,
TimestampNano: 101,
Message: "node event",
EventChangeType: si.EventRecord_ADD,
EventChangeDetail: si.EventRecord_APP_ALLOC,
ObjectID: "node",
ReferenceID: "alloc",
Resource: protoRes,
}
ev.AddEvent(nodeEvent)
queueEvent = &si.EventRecord{
Type: si.EventRecord_QUEUE,
TimestampNano: 102,
Message: "queue event",
EventChangeType: si.EventRecord_REMOVE,
EventChangeDetail: si.EventRecord_QUEUE_APP,
ObjectID: "root.default",
ReferenceID: "app",
}
ev.AddEvent(queueEvent)
noEvents := uint64(0)
err := common.WaitFor(10*time.Millisecond, time.Second, func() bool {
noEvents = ev.Store.CountStoredEvents()
return noEvents == 3
})
assert.NilError(t, err, "wanted 3 events, got %d", noEvents)
return appEvent, nodeEvent, queueEvent
}
func checkSingleEvent(t *testing.T, event *si.EventRecord, query string) {
req, err := http.NewRequest("GET", "/ws/v1/events/batch?"+query, strings.NewReader(""))
assert.NilError(t, err)
eventDao := getEventRecordDao(t, req)
assert.Assert(t, eventDao.InstanceUUID != "")
assert.Equal(t, 1, len(eventDao.EventRecords))
compareEvents(t, event, eventDao.EventRecords[0])
}
func checkIllegalBatchRequest(t *testing.T, query, msg string) {
t.Helper()
req, err := http.NewRequest("GET", "/ws/v1/events/batch?"+query, strings.NewReader(""))
assert.NilError(t, err)
readIllegalRequest(t, req, http.StatusBadRequest, msg)
}
func readIllegalRequest(t *testing.T, req *http.Request, statusCode int, errMsg string) {
t.Helper()
rr := httptest.NewRecorder()
handler := http.HandlerFunc(getEvents)
handler.ServeHTTP(rr, req)
assert.Equal(t, statusCode, rr.Code)
jsonBytes := make([]byte, 256)
n, err := rr.Body.Read(jsonBytes)
assert.NilError(t, err, "cannot read response body")
var errObject dao.YAPIError
err = json.Unmarshal(jsonBytes[:n], &errObject)
assert.NilError(t, err, "cannot unmarshal events dao")
assert.Assert(t, strings.Contains(errObject.Message, errMsg), "Error message [%s] not found inside the string: [%s]", errMsg, errObject.Message)
}
func checkAllEvents(t *testing.T, events []*si.EventRecord) {
t.Helper()
req, err := http.NewRequest("GET", "/ws/v1/events/batch/", strings.NewReader(""))
assert.NilError(t, err)
eventDao := getEventRecordDao(t, req)
for i := 0; i < len(events); i++ {
compareEvents(t, events[i], eventDao.EventRecords[i])
}
}
func compareEvents(t *testing.T, event, eventFromDao *si.EventRecord) {
t.Helper()
assert.Equal(t, event.TimestampNano, eventFromDao.TimestampNano)
assert.Equal(t, event.EventChangeType, eventFromDao.EventChangeType)
assert.Equal(t, event.EventChangeDetail, eventFromDao.EventChangeDetail)
assert.Equal(t, event.ObjectID, eventFromDao.ObjectID)
assert.Equal(t, event.ReferenceID, eventFromDao.ReferenceID)
assert.Equal(t, event.Message, eventFromDao.Message)
res0 := resources.NewResourceFromProto(event.Resource)
res1 := resources.NewResourceFromProto(eventFromDao.Resource)
assert.Assert(t, resources.Equals(res0, res1))
}
func getEventRecordDao(t *testing.T, req *http.Request) dao.EventRecordDAO {
t.Helper()
rr := httptest.NewRecorder()
handler := http.HandlerFunc(getEvents)
handler.ServeHTTP(rr, req)
jsonBytes := make([]byte, 2048)
n, err := rr.Body.Read(jsonBytes)
assert.NilError(t, err, "cannot read response body")
var eventDao dao.EventRecordDAO
err = json.Unmarshal(jsonBytes[:n], &eventDao)
assert.NilError(t, err, "cannot unmarshal events dao")
return eventDao
}
func prepareSchedulerContext(t *testing.T) *scheduler.ClusterContext {
config := []byte(configDefault)
var err error
schedulerContext, err = scheduler.NewClusterContext(rmID, policyGroup, config)
assert.NilError(t, err, "Error when load clusterInfo from config")
assert.Equal(t, 1, len(schedulerContext.GetPartitionMapClone()))
return schedulerContext
}
func prepareUserAndGroupContext(t *testing.T, config string) {
clearUserManager()
part := setup(t, config, 1)
// add 1 application
app := addAppWithUserGroup(t, "app-1", part, "root.default", false, security.UserGroup{
User: "",
Groups: []string{""},
})
res := &si.Resource{
Resources: map[string]*si.Quantity{"vcore": {Value: 1}},
}
ask := objects.NewAllocationAskFromSI(&si.AllocationAsk{
ApplicationID: "app-1",
PartitionName: part.Name,
ResourceAsk: res})
err := app.AddAllocationAsk(ask)
assert.NilError(t, err, "ask should have been added to app")
// add an alloc
allocInfo := objects.NewAllocation("node-1", ask)
app.AddAllocation(allocInfo)
assert.Assert(t, app.IsRunning(), "Application did not return running state after alloc: %s", app.CurrentState())
NewWebApp(schedulerContext, nil)
}
func prepareEmptyUserGroupContext() {
clearUserManager()
NewWebApp(&scheduler.ClusterContext{}, nil)
}
func clearUserManager() {
userManager := ugm.GetUserManager()
userManager.ClearUserTrackers()
userManager.ClearGroupTrackers()
}
func verifyStateDumpJSON(t *testing.T, aggregated *AggregatedStateInfo) {
assert.Check(t, aggregated.Timestamp != 0)
assert.Check(t, len(aggregated.Partitions) > 0)
assert.Check(t, len(aggregated.Nodes) > 0)
assert.Check(t, len(aggregated.ClusterInfo) > 0)
assert.Check(t, len(aggregated.Queues) > 0)
assert.Check(t, len(aggregated.LogLevel) > 0)
assert.Check(t, len(aggregated.Config.SchedulerConfig.Partitions) > 0)
assert.Check(t, len(aggregated.Config.Extra) > 0)
}
func TestCheckHealthStatusNotFound(t *testing.T) {
NewWebApp(&scheduler.ClusterContext{}, nil)
req, err := http.NewRequest("GET", "/ws/v1/scheduler/healthcheck", strings.NewReader(""))
assert.NilError(t, err, "Error while creating the healthcheck request")
resp := &MockResponseWriter{}
checkHealthStatus(resp, req)
var errInfo dao.YAPIError
err = json.Unmarshal(resp.outputBytes, &errInfo)
assert.NilError(t, err, unmarshalError)
assert.Equal(t, http.StatusNotFound, errInfo.StatusCode, statusCodeError)
assert.Equal(t, "Health check is not available", errInfo.Message, jsonMessageError)
}
func TestCheckHealthStatus(t *testing.T) {
runHealthCheckTest(t, &dao.SchedulerHealthDAOInfo{
Healthy: true,
HealthChecks: []dao.HealthCheckInfo{
{
Name: "Scheduling errors",
Succeeded: true,
Description: "Check for scheduling error entries in metrics",
DiagnosisMessage: "There were 0 scheduling errors logged in the metrics",
},
},
})
runHealthCheckTest(t, &dao.SchedulerHealthDAOInfo{
Healthy: false,
HealthChecks: []dao.HealthCheckInfo{
{
Name: "Failed nodes",
Succeeded: false,
Description: "Check for failed nodes entries in metrics",
DiagnosisMessage: "There were 1 failed nodes logged in the metrics",
},
},
})
}
func runHealthCheckTest(t *testing.T, expected *dao.SchedulerHealthDAOInfo) {
schedulerContext := &scheduler.ClusterContext{}
schedulerContext.SetLastHealthCheckResult(expected)
NewWebApp(schedulerContext, nil)
req, err := http.NewRequest("GET", "/ws/v1/scheduler/healthcheck", strings.NewReader(""))
assert.NilError(t, err, "Error while creating the healthcheck request")
resp := &MockResponseWriter{}
checkHealthStatus(resp, req)
var actual dao.SchedulerHealthDAOInfo
err = json.Unmarshal(resp.outputBytes, &actual)
assert.NilError(t, err, unmarshalError)
assert.Equal(t, expected.Healthy, actual.Healthy)
assert.Equal(t, len(expected.HealthChecks), len(actual.HealthChecks))
for i, expectedHealthCheck := range expected.HealthChecks {
actualHealthCheck := actual.HealthChecks[i]
assert.Equal(t, expectedHealthCheck.Name, actualHealthCheck.Name)
assert.Equal(t, expectedHealthCheck.Succeeded, actualHealthCheck.Succeeded)
assert.Equal(t, expectedHealthCheck.Description, actualHealthCheck.Description)
assert.Equal(t, expectedHealthCheck.DiagnosisMessage, actualHealthCheck.DiagnosisMessage)
}
}
type ResponseRecorderWithDeadline struct {
*httptest.ResponseRecorder
setWriteFails bool
setWriteFailsAt int
setWriteCalls int
setReadFails bool
}
func (rrd *ResponseRecorderWithDeadline) SetWriteDeadline(_ time.Time) error {
rrd.setWriteCalls++
if rrd.setWriteFails && rrd.setWriteCalls == rrd.setWriteFailsAt {
return errors.New("SetWriteDeadline failed")
}
return nil
}
func (rrd *ResponseRecorderWithDeadline) SetReadDeadline(_ time.Time) error {
if rrd.setReadFails {
return errors.New("SetReadDeadline failed")
}
return nil
}
func NewResponseRecorderWithDeadline() *ResponseRecorderWithDeadline {
return &ResponseRecorderWithDeadline{
ResponseRecorder: httptest.NewRecorder(),
}
}