| /* |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package webservice |
| |
| import ( |
| "crypto/sha256" |
| "encoding/json" |
| "fmt" |
| "net/http" |
| "net/http/httptest" |
| "os" |
| "reflect" |
| "strings" |
| "testing" |
| "time" |
| |
| "github.com/gorilla/mux" |
| "github.com/prometheus/client_golang/prometheus/promhttp" |
| "gopkg.in/yaml.v2" |
| "gotest.tools/assert" |
| |
| "github.com/apache/yunikorn-core/pkg/common" |
| "github.com/apache/yunikorn-core/pkg/common/configs" |
| "github.com/apache/yunikorn-core/pkg/common/resources" |
| "github.com/apache/yunikorn-core/pkg/common/security" |
| "github.com/apache/yunikorn-core/pkg/metrics/history" |
| "github.com/apache/yunikorn-core/pkg/plugins" |
| "github.com/apache/yunikorn-core/pkg/scheduler" |
| "github.com/apache/yunikorn-core/pkg/scheduler/objects" |
| "github.com/apache/yunikorn-core/pkg/scheduler/tests" |
| "github.com/apache/yunikorn-core/pkg/webservice/dao" |
| siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common" |
| "github.com/apache/yunikorn-scheduler-interface/lib/go/si" |
| ) |
| |
| const partitionNameWithoutClusterID = "default" |
| const normalizedPartitionName = "[rm-123]default" |
| const startConf = ` |
| partitions: |
| - name: default |
| nodesortpolicy: |
| type: fair |
| queues: |
| - name: root |
| properties: |
| first: "some value with spaces" |
| second: somethingElse |
| ` |
| const updatedConf = ` |
| partitions: |
| - name: default |
| nodesortpolicy: |
| type: binpacking |
| queues: |
| - name: root |
| properties: |
| first: "changedValue" |
| ` |
| |
| const baseConf = ` |
| partitions: |
| - name: default |
| nodesortpolicy: |
| type: fair |
| queues: |
| - name: root |
| submitacl: "*" |
| ` |
| |
| const invalidConf = ` |
| partitions: |
| - name: default |
| nodesortpolicy: |
| type: invalid |
| queues: |
| - name: root |
| ` |
| |
| const configDefault = ` |
| partitions: |
| - name: default |
| queues: |
| - name: root |
| submitacl: "*" |
| queues: |
| - name: default |
| - name: noapps |
| ` |
| |
| const configStateDumpFilePath = ` |
| partitions: |
| - name: default |
| statedumpfilepath: "tmp/non-default-yunikorn-state.txt" |
| queues: |
| - name: root |
| submitacl: "*" |
| queues: |
| - name: default |
| - name: noapps |
| ` |
| |
| const configMultiPartitions = ` |
| partitions: |
| - |
| name: gpu |
| queues: |
| - |
| name: root |
| - |
| name: default |
| nodesortpolicy: |
| type: fair |
| queues: |
| - |
| name: root |
| queues: |
| - |
| name: default |
| submitacl: "*" |
| ` |
| const configTwoLevelQueues = ` |
| partitions: |
| - |
| name: gpu |
| queues: |
| - |
| name: root |
| - |
| name: default |
| nodesortpolicy: |
| type: binpacking |
| queues: |
| - |
| name: root |
| properties: |
| application.sort.policy: stateaware |
| childtemplate: |
| properties: |
| application.sort.policy: stateaware |
| resources: |
| guaranteed: |
| memory: 400000 |
| max: |
| memory: 600000 |
| queues: |
| - |
| name: a |
| queues: |
| - |
| name: a1 |
| resources: |
| guaranteed: |
| memory: 500000 |
| vcore: 50000 |
| max: |
| memory: 800000 |
| vcore: 80000 |
| resources: |
| guaranteed: |
| memory: 500000 |
| vcore: 50000 |
| max: |
| memory: 800000 |
| vcore: 80000 |
| - |
| name: b |
| resources: |
| guaranteed: |
| memory: 400000 |
| vcore: 40000 |
| max: |
| memory: 600000 |
| vcore: 60000 |
| - |
| name: c |
| resources: |
| guaranteed: |
| memory: 100000 |
| vcore: 10000 |
| max: |
| memory: 100000 |
| vcore: 10000 |
| ` |
| |
| const rmID = "rm-123" |
| const policyGroup = "default-policy-group" |
| const queueName = "root.default" |
| const nodeID = "node-1" |
| |
| // setup To take care of setting up config, cluster, partitions etc |
| func setup(t *testing.T, config string, partitionCount int) *scheduler.PartitionContext { |
| configs.MockSchedulerConfigByData([]byte(config)) |
| var err error |
| schedulerContext, err = scheduler.NewClusterContext(rmID, policyGroup) |
| assert.NilError(t, err, "Error when load clusterInfo from config") |
| |
| assert.Equal(t, partitionCount, len(schedulerContext.GetPartitionMapClone())) |
| |
| // Check default partition |
| partitionName := common.GetNormalizedPartitionName("default", rmID) |
| part := schedulerContext.GetPartition(partitionName) |
| assert.Equal(t, 0, len(part.GetApplications())) |
| return part |
| } |
| |
| // simple wrapper to make creating an app easier |
| func newApplication(appID, partitionName, queueName, rmID string, ugi security.UserGroup) *objects.Application { |
| siApp := &si.AddApplicationRequest{ |
| ApplicationID: appID, |
| QueueName: queueName, |
| PartitionName: partitionName, |
| } |
| return objects.NewApplication(siApp, ugi, nil, rmID) |
| } |
| |
| func TestValidateConf(t *testing.T) { |
| confTests := []struct { |
| content string |
| expectedResponse dao.ValidateConfResponse |
| }{ |
| { |
| content: baseConf, |
| expectedResponse: dao.ValidateConfResponse{ |
| Allowed: true, |
| Reason: "", |
| }, |
| }, |
| { |
| content: invalidConf, |
| expectedResponse: dao.ValidateConfResponse{ |
| Allowed: false, |
| Reason: "undefined policy: invalid", |
| }, |
| }, |
| } |
| for _, test := range confTests { |
| // No err check: new request always returns correctly |
| //nolint: errcheck |
| req, _ := http.NewRequest("POST", "", strings.NewReader(test.content)) |
| resp := &MockResponseWriter{} |
| validateConf(resp, req) |
| var vcr dao.ValidateConfResponse |
| err := json.Unmarshal(resp.outputBytes, &vcr) |
| assert.NilError(t, err, "failed to unmarshal ValidateConfResponse from response body") |
| assert.Equal(t, vcr.Allowed, test.expectedResponse.Allowed, "allowed flag incorrect") |
| assert.Equal(t, vcr.Reason, test.expectedResponse.Reason, "response text not as expected") |
| } |
| } |
| |
| func TestApplicationHistory(t *testing.T) { |
| // make sure the history is nil when we finish this test |
| defer ResetIMHistory() |
| // No err check: new request always returns correctly |
| //nolint: errcheck |
| req, _ := http.NewRequest("GET", "", strings.NewReader("")) |
| resp := &MockResponseWriter{} |
| // no init should return nothing |
| getApplicationHistory(resp, req) |
| |
| var errInfo dao.YAPIError |
| err := json.Unmarshal(resp.outputBytes, &errInfo) |
| assert.NilError(t, err, "failed to unmarshal app history response from response body: %s", string(resp.outputBytes)) |
| assert.Equal(t, http.StatusNotImplemented, resp.statusCode, "app history handler returned wrong status") |
| assert.Equal(t, errInfo.Message, "Internal metrics collection is not enabled.", "JSON error message is incorrect") |
| assert.Equal(t, errInfo.StatusCode, http.StatusNotImplemented) |
| |
| // init should return null and thus no records |
| imHistory = history.NewInternalMetricsHistory(5) |
| resp = &MockResponseWriter{} |
| getApplicationHistory(resp, req) |
| var appHist []dao.ApplicationHistoryDAOInfo |
| err = json.Unmarshal(resp.outputBytes, &appHist) |
| assert.NilError(t, err, "failed to unmarshal app history response from response body: %s", string(resp.outputBytes)) |
| assert.Equal(t, resp.statusCode, 0, "app response should have no status") |
| assert.Equal(t, len(appHist), 0, "empty response must have no records") |
| |
| // add new history records |
| imHistory.Store(1, 0) |
| imHistory.Store(2, 0) |
| imHistory.Store(30, 0) |
| resp = &MockResponseWriter{} |
| getApplicationHistory(resp, req) |
| err = json.Unmarshal(resp.outputBytes, &appHist) |
| assert.NilError(t, err, "failed to unmarshal app history response from response body: %s", string(resp.outputBytes)) |
| assert.Equal(t, resp.statusCode, 0, "app response should have no status") |
| assert.Equal(t, len(appHist), 3, "incorrect number of records returned") |
| assert.Equal(t, appHist[0].TotalApplications, "1", "metric 1 should be 1 apps and was not") |
| assert.Equal(t, appHist[2].TotalApplications, "30", "metric 3 should be 30 apps and was not") |
| |
| // add new history records roll over the limit |
| // this gives us a list of (oldest to newest): 2, 30, 40, 50, 300 |
| imHistory.Store(40, 0) |
| imHistory.Store(50, 0) |
| imHistory.Store(300, 0) |
| resp = &MockResponseWriter{} |
| getApplicationHistory(resp, req) |
| err = json.Unmarshal(resp.outputBytes, &appHist) |
| assert.NilError(t, err, "failed to unmarshal app history response from response body: %s", string(resp.outputBytes)) |
| assert.Equal(t, resp.statusCode, 0, "app response should have no status") |
| assert.Equal(t, len(appHist), 5, "incorrect number of records returned") |
| assert.Equal(t, appHist[0].TotalApplications, "2", "metric 1 should be 1 apps and was not") |
| assert.Equal(t, appHist[4].TotalApplications, "300", "metric 5 should be 300 apps and was not") |
| } |
| |
| func TestContainerHistory(t *testing.T) { |
| // make sure the history is nil when we finish this test |
| defer ResetIMHistory() |
| // No err check: new request always returns correctly |
| //nolint: errcheck |
| req, _ := http.NewRequest("GET", "", strings.NewReader("")) |
| resp := &MockResponseWriter{} |
| // no init should return nothing |
| getContainerHistory(resp, req) |
| |
| var errInfo dao.YAPIError |
| err := json.Unmarshal(resp.outputBytes, &errInfo) |
| assert.NilError(t, err, "failed to unmarshal container history response from response body: %s", string(resp.outputBytes)) |
| assert.Equal(t, http.StatusNotImplemented, resp.statusCode, "container history handler returned wrong status") |
| assert.Equal(t, errInfo.Message, "Internal metrics collection is not enabled.", "JSON error message is incorrect") |
| assert.Equal(t, errInfo.StatusCode, http.StatusNotImplemented) |
| |
| // init should return null and thus no records |
| imHistory = history.NewInternalMetricsHistory(5) |
| resp = &MockResponseWriter{} |
| getContainerHistory(resp, req) |
| var contHist []dao.ContainerHistoryDAOInfo |
| err = json.Unmarshal(resp.outputBytes, &contHist) |
| assert.NilError(t, err, "failed to unmarshal container history response from response body: %s", string(resp.outputBytes)) |
| assert.Equal(t, resp.statusCode, 0, "container response should have no status") |
| assert.Equal(t, len(contHist), 0, "empty response must have no records") |
| |
| // add new history records |
| imHistory.Store(0, 1) |
| imHistory.Store(0, 2) |
| imHistory.Store(0, 30) |
| resp = &MockResponseWriter{} |
| getContainerHistory(resp, req) |
| err = json.Unmarshal(resp.outputBytes, &contHist) |
| assert.NilError(t, err, "failed to unmarshal container history response from response body: %s", string(resp.outputBytes)) |
| assert.Equal(t, resp.statusCode, 0, "container response should have no status") |
| assert.Equal(t, len(contHist), 3, "incorrect number of records returned") |
| assert.Equal(t, contHist[0].TotalContainers, "1", "metric 1 should be 1 apps and was not") |
| assert.Equal(t, contHist[2].TotalContainers, "30", "metric 3 should be 30 apps and was not") |
| |
| // add new history records roll over the limit |
| // this gives us a list of (oldest to newest): 2, 30, 40, 50, 300 |
| imHistory.Store(0, 40) |
| imHistory.Store(0, 50) |
| imHistory.Store(0, 300) |
| resp = &MockResponseWriter{} |
| getContainerHistory(resp, req) |
| err = json.Unmarshal(resp.outputBytes, &contHist) |
| assert.NilError(t, err, "failed to unmarshal container history response from response body: %s", string(resp.outputBytes)) |
| assert.Equal(t, resp.statusCode, 0, "container response should have no status") |
| assert.Equal(t, len(contHist), 5, "incorrect number of records returned") |
| assert.Equal(t, contHist[0].TotalContainers, "2", "metric 1 should be 1 apps and was not") |
| assert.Equal(t, contHist[4].TotalContainers, "300", "metric 5 should be 300 apps and was not") |
| } |
| |
| func TestGetConfigYAML(t *testing.T) { |
| configs.MockSchedulerConfigByData([]byte(startConf)) |
| var err error |
| schedulerContext, err = scheduler.NewClusterContext(rmID, policyGroup) |
| assert.NilError(t, err, "Error when load clusterInfo from config") |
| // No err check: new request always returns correctly |
| //nolint: errcheck |
| req, _ := http.NewRequest("GET", "", nil) |
| resp := &MockResponseWriter{} |
| getClusterConfig(resp, req) |
| // yaml unmarshal handles the checksum add the end automatically in this implementation |
| conf := &configs.SchedulerConfig{} |
| err = yaml.Unmarshal(resp.outputBytes, conf) |
| assert.NilError(t, err, "failed to unmarshal config from response body") |
| assert.Equal(t, conf.Partitions[0].NodeSortPolicy.Type, "fair", "node sort policy set incorrectly, not fair") |
| |
| startConfSum := conf.Checksum |
| assert.Assert(t, len(startConfSum) > 0, "checksum boundary not found") |
| |
| // change the config |
| configs.MockSchedulerConfigByData([]byte(updatedConf)) |
| err = schedulerContext.UpdateRMSchedulerConfig(rmID) |
| assert.NilError(t, err, "Error when updating clusterInfo from config") |
| // check that we return yaml by default, unmarshal will error when we don't |
| req.Header.Set("Accept", "unknown") |
| getClusterConfig(resp, req) |
| err = yaml.Unmarshal(resp.outputBytes, conf) |
| assert.NilError(t, err, "failed to unmarshal config from response body (updated config)") |
| assert.Equal(t, conf.Partitions[0].NodeSortPolicy.Type, "binpacking", "node sort policy not updated") |
| assert.Assert(t, startConfSum != conf.Checksum, "checksums did not change in output") |
| } |
| |
| func TestGetConfigJSON(t *testing.T) { |
| setup(t, startConf, 1) |
| // No err check: new request always returns correctly |
| //nolint: errcheck |
| req, _ := http.NewRequest("GET", "", nil) |
| req.Header.Set("Accept", "application/json") |
| resp := &MockResponseWriter{} |
| getClusterConfig(resp, req) |
| |
| conf := &configs.SchedulerConfig{} |
| err := json.Unmarshal(resp.outputBytes, conf) |
| assert.NilError(t, err, "failed to unmarshal config from response body (json)") |
| startConfSum := conf.Checksum |
| assert.Equal(t, conf.Partitions[0].NodeSortPolicy.Type, "fair", "node sort policy set incorrectly, not fair (json)") |
| |
| // change the config |
| configs.MockSchedulerConfigByData([]byte(updatedConf)) |
| err = schedulerContext.UpdateRMSchedulerConfig(rmID) |
| assert.NilError(t, err, "Error when updating clusterInfo from config") |
| |
| getClusterConfig(resp, req) |
| err = json.Unmarshal(resp.outputBytes, conf) |
| assert.NilError(t, err, "failed to unmarshal config from response body (json, updated config)") |
| assert.Assert(t, startConfSum != conf.Checksum, "checksums did not change in json output: %s, %s", startConfSum, conf.Checksum) |
| assert.Equal(t, conf.Partitions[0].NodeSortPolicy.Type, "binpacking", "node sort policy not updated (json)") |
| } |
| |
| type FakeConfigPlugin struct { |
| tests.MockResourceManagerCallback |
| generateError bool |
| } |
| |
| func (f FakeConfigPlugin) UpdateConfiguration(args *si.UpdateConfigurationRequest) *si.UpdateConfigurationResponse { |
| if f.generateError { |
| return &si.UpdateConfigurationResponse{ |
| Success: false, |
| Reason: "configuration update error", |
| } |
| } |
| return &si.UpdateConfigurationResponse{ |
| Success: true, |
| OldConfig: startConf, |
| } |
| } |
| |
| func TestSaveConfigMapNoError(t *testing.T) { |
| plugins.RegisterSchedulerPlugin(&FakeConfigPlugin{generateError: false}) |
| oldConf, err := updateConfiguration(updatedConf) |
| assert.NilError(t, err, "No error expected") |
| assert.Equal(t, oldConf, startConf, " Wrong returned configuration") |
| } |
| |
| func TestSaveConfigMapErrorExpected(t *testing.T) { |
| plugins.RegisterSchedulerPlugin(&FakeConfigPlugin{generateError: true}) |
| oldConf, err := updateConfiguration(updatedConf) |
| assert.Assert(t, err != nil, "Missing expected error") |
| assert.Equal(t, oldConf, "", " Wrong returned configuration") |
| } |
| |
| func TestBuildUpdateResponseSuccess(t *testing.T) { |
| resp := &MockResponseWriter{} |
| buildUpdateResponse(nil, resp) |
| assert.Equal(t, http.StatusOK, resp.statusCode, "Response should be OK") |
| } |
| |
| func TestBuildUpdateResponseFailure(t *testing.T) { |
| resp := &MockResponseWriter{} |
| err := fmt.Errorf("ConfigMapUpdate failed") |
| buildUpdateResponse(err, resp) |
| |
| var errInfo dao.YAPIError |
| err1 := json.Unmarshal(resp.outputBytes, &errInfo) |
| assert.NilError(t, err1, "failed to unmarshal updateconfig dao response from response body: %s", string(resp.outputBytes)) |
| assert.Equal(t, http.StatusConflict, resp.statusCode, "Status code is wrong") |
| assert.Assert(t, strings.Contains(string(errInfo.Message), err.Error()), "Error message should contain the reason") |
| assert.Equal(t, errInfo.StatusCode, http.StatusConflict) |
| } |
| |
| func TestUpdateConfig(t *testing.T) { |
| prepareSchedulerForConfigChange(t) |
| resp := &MockResponseWriter{} |
| baseChecksum := configs.ConfigContext.Get(schedulerContext.GetPolicyGroup()).Checksum |
| conf := appendChecksum(updatedConf, baseChecksum) |
| req, err := http.NewRequest("PUT", "", strings.NewReader(conf)) |
| assert.NilError(t, err, "Failed to create the request") |
| updateClusterConfig(resp, req) |
| assert.Equal(t, http.StatusOK, resp.statusCode, "No error expected") |
| } |
| |
| func TestUpdateConfigInvalidConf(t *testing.T) { |
| prepareSchedulerForConfigChange(t) |
| resp := &MockResponseWriter{} |
| req, err := http.NewRequest("PUT", "", strings.NewReader(invalidConf)) |
| assert.NilError(t, err, "Failed to create the request") |
| updateClusterConfig(resp, req) |
| |
| var errInfo dao.YAPIError |
| err1 := json.Unmarshal(resp.outputBytes, &errInfo) |
| assert.NilError(t, err1, "failed to unmarshal updateconfig dao response from response body: %s", string(resp.outputBytes)) |
| assert.Equal(t, http.StatusConflict, resp.statusCode, "Status code is wrong") |
| assert.Assert(t, len(string(errInfo.Message)) > 0, "Error message is expected") |
| assert.Equal(t, errInfo.StatusCode, http.StatusConflict) |
| } |
| |
| func TestUpdateConfigWrongChecksum(t *testing.T) { |
| prepareSchedulerForConfigChange(t) |
| resp := &MockResponseWriter{} |
| baseChecksum := fmt.Sprintf("%X", sha256.Sum256([]byte(updatedConf))) |
| conf := appendChecksum(updatedConf, baseChecksum) |
| req, err := http.NewRequest("PUT", "", strings.NewReader(conf)) |
| assert.NilError(t, err, "Failed to create the request") |
| updateClusterConfig(resp, req) |
| |
| var errInfo dao.YAPIError |
| err1 := json.Unmarshal(resp.outputBytes, &errInfo) |
| assert.NilError(t, err1, "failed to unmarshal updateconfig dao response from response body: %s", string(resp.outputBytes)) |
| assert.Equal(t, http.StatusConflict, resp.statusCode, "Status code is wrong") |
| assert.Assert(t, strings.Contains(string(errInfo.Message), "the base configuration is changed"), "Wrong error message received") |
| assert.Equal(t, errInfo.StatusCode, http.StatusConflict) |
| } |
| |
| func appendChecksum(conf string, checksum string) string { |
| conf += "checksum: " + checksum |
| return conf |
| } |
| |
| func prepareSchedulerForConfigChange(t *testing.T) { |
| plugins.RegisterSchedulerPlugin(&FakeConfigPlugin{generateError: false}) |
| configs.MockSchedulerConfigByData([]byte(startConf)) |
| var err error |
| schedulerContext, err = scheduler.NewClusterContext(rmID, policyGroup) |
| assert.NilError(t, err, "Error when load clusterInfo from config") |
| } |
| |
| func TestGetClusterUtilJSON(t *testing.T) { |
| setup(t, configDefault, 1) |
| |
| // check build information of RM |
| buildInfoMap := make(map[string]string) |
| buildInfoMap["buildDate"] = "2006-01-02T15:04:05-0700" |
| buildInfoMap["buildVersion"] = "latest" |
| buildInfoMap["isPluginVersion"] = "false" |
| schedulerContext.SetRMInfo(rmID, buildInfoMap) |
| rmInfo := schedulerContext.GetRMInfoMapClone() |
| assert.Equal(t, 1, len(rmInfo)) |
| rmBuildInformationMaps := getRMBuildInformation(rmInfo) |
| assert.Equal(t, 1, len(rmBuildInformationMaps)) |
| assert.Equal(t, rmBuildInformationMaps[0]["buildDate"], buildInfoMap["buildDate"]) |
| assert.Equal(t, rmBuildInformationMaps[0]["buildVersion"], buildInfoMap["buildVersion"]) |
| assert.Equal(t, rmBuildInformationMaps[0]["isPluginVersion"], buildInfoMap["isPluginVersion"]) |
| assert.Equal(t, rmBuildInformationMaps[0]["rmId"], rmID) |
| |
| // Check test partitions |
| partitionName := common.GetNormalizedPartitionName("default", rmID) |
| partition := schedulerContext.GetPartition(partitionName) |
| assert.Equal(t, partitionName, partition.Name) |
| // new app to partition |
| appID := "appID-1" |
| app := newApplication(appID, partitionName, queueName, rmID, security.UserGroup{}) |
| err := partition.AddApplication(app) |
| assert.NilError(t, err, "add application to partition should not have failed") |
| // case of total resource and allocated resource undefined |
| utilZero := &dao.ClusterUtilDAOInfo{ |
| ResourceType: "N/A", |
| Total: int64(-1), |
| Used: int64(-1), |
| Usage: "N/A", |
| } |
| result0 := getClusterUtilJSON(partition) |
| assert.Equal(t, ContainsObj(result0, utilZero), true) |
| |
| // add node to partition with allocations |
| nodeRes := resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 1000, siCommon.CPU: 1000}).ToProto() |
| node1 := objects.NewNode(&si.NodeInfo{NodeID: nodeID, SchedulableResource: nodeRes}) |
| |
| resAlloc1 := resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 500, siCommon.CPU: 300}) |
| resAlloc2 := resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 300, siCommon.CPU: 200}) |
| ask1 := objects.NewAllocationAsk("alloc-1", appID, resAlloc1) |
| ask2 := objects.NewAllocationAsk("alloc-2", appID, resAlloc2) |
| alloc1 := objects.NewAllocation("alloc-1-uuid", nodeID, ask1) |
| alloc2 := objects.NewAllocation("alloc-2-uuid", nodeID, ask2) |
| allocs := []*objects.Allocation{alloc1, alloc2} |
| err = partition.AddNode(node1, allocs) |
| assert.NilError(t, err, "add node to partition should not have failed") |
| |
| // set expected result |
| utilMem := &dao.ClusterUtilDAOInfo{ |
| ResourceType: siCommon.Memory, |
| Total: int64(1000), |
| Used: int64(800), |
| Usage: "80%", |
| } |
| utilCore := &dao.ClusterUtilDAOInfo{ |
| ResourceType: siCommon.CPU, |
| Total: int64(1000), |
| Used: int64(500), |
| Usage: "50%", |
| } |
| // check result fit answer or not |
| result := getClusterUtilJSON(partition) |
| assert.Equal(t, ContainsObj(result, utilMem), true) |
| assert.Equal(t, ContainsObj(result, utilCore), true) |
| } |
| |
| func ContainsObj(slice interface{}, contains interface{}) bool { |
| value := reflect.ValueOf(slice) |
| for i := 0; i < value.Len(); i++ { |
| if value.Index(i).Interface() == contains { |
| return true |
| } |
| if reflect.DeepEqual(value.Index(i).Interface(), contains) { |
| return true |
| } |
| if fmt.Sprintf("%#v", value.Index(i).Interface()) == fmt.Sprintf("%#v", contains) { |
| return true |
| } |
| } |
| return false |
| } |
| |
| func TestGetNodesUtilJSON(t *testing.T) { |
| partition := setup(t, configDefault, 1) |
| |
| // create test application |
| appID := "app1" |
| app := newApplication(appID, partition.Name, queueName, rmID, security.UserGroup{}) |
| err := partition.AddApplication(app) |
| assert.NilError(t, err, "add application to partition should not have failed") |
| |
| // create test nodes |
| nodeRes := resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 1000, siCommon.CPU: 1000}).ToProto() |
| nodeRes2 := resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 1000, siCommon.CPU: 1000, "GPU": 10}).ToProto() |
| node1ID := "node-1" |
| node1 := objects.NewNode(&si.NodeInfo{NodeID: node1ID, SchedulableResource: nodeRes}) |
| node2ID := "node-2" |
| node2 := objects.NewNode(&si.NodeInfo{NodeID: node2ID, SchedulableResource: nodeRes2}) |
| |
| // create test allocations |
| resAlloc1 := resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 500, siCommon.CPU: 300}) |
| resAlloc2 := resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 300, siCommon.CPU: 500, "GPU": 5}) |
| ask1 := objects.NewAllocationAsk("alloc-1", appID, resAlloc1) |
| ask2 := objects.NewAllocationAsk("alloc-2", appID, resAlloc2) |
| allocs := []*objects.Allocation{objects.NewAllocation("alloc-1-uuid", node1ID, ask1)} |
| err = partition.AddNode(node1, allocs) |
| assert.NilError(t, err, "add node to partition should not have failed") |
| allocs = []*objects.Allocation{objects.NewAllocation("alloc-2-uuid", node2ID, ask2)} |
| err = partition.AddNode(node2, allocs) |
| assert.NilError(t, err, "add node to partition should not have failed") |
| |
| // get nodes utilization |
| res1 := getNodesUtilJSON(partition, siCommon.Memory) |
| res2 := getNodesUtilJSON(partition, siCommon.CPU) |
| res3 := getNodesUtilJSON(partition, "GPU") |
| resNon := getNodesUtilJSON(partition, "non-exist") |
| subres1 := res1.NodesUtil |
| subres2 := res2.NodesUtil |
| subres3 := res3.NodesUtil |
| subresNon := resNon.NodesUtil |
| |
| assert.Equal(t, res1.ResourceType, siCommon.Memory) |
| assert.Equal(t, subres1[2].NumOfNodes, int64(1)) |
| assert.Equal(t, subres1[4].NumOfNodes, int64(1)) |
| assert.Equal(t, subres1[2].NodeNames[0], node2ID) |
| assert.Equal(t, subres1[4].NodeNames[0], node1ID) |
| |
| assert.Equal(t, res2.ResourceType, siCommon.CPU) |
| assert.Equal(t, subres2[2].NumOfNodes, int64(1)) |
| assert.Equal(t, subres2[4].NumOfNodes, int64(1)) |
| assert.Equal(t, subres2[2].NodeNames[0], node1ID) |
| assert.Equal(t, subres2[4].NodeNames[0], node2ID) |
| |
| assert.Equal(t, res3.ResourceType, "GPU") |
| assert.Equal(t, subres3[4].NumOfNodes, int64(1)) |
| assert.Equal(t, subres3[4].NodeNames[0], node2ID) |
| |
| assert.Equal(t, resNon.ResourceType, "non-exist") |
| assert.Equal(t, subresNon[0].NumOfNodes, int64(0)) |
| assert.Equal(t, len(subresNon[0].NodeNames), 0) |
| } |
| |
| func addAndConfirmApplicationExists(t *testing.T, partitionName string, partition *scheduler.PartitionContext, appName string) *objects.Application { |
| // add a new app |
| app := newApplication(appName, partitionName, "root.default", rmID, security.UserGroup{}) |
| err := partition.AddApplication(app) |
| assert.NilError(t, err, "Failed to add Application to Partition.") |
| assert.Equal(t, app.CurrentState(), objects.New.String()) |
| return app |
| } |
| |
| func TestPartitions(t *testing.T) { |
| defaultPartition := setup(t, configMultiPartitions, 2) |
| partitionName := defaultPartition.Name |
| |
| // add a new app |
| addAndConfirmApplicationExists(t, partitionName, defaultPartition, "app-0") |
| |
| // add a new app1 - accepted |
| app1 := addAndConfirmApplicationExists(t, partitionName, defaultPartition, "app-1") |
| app1.SetState(objects.Accepted.String()) |
| |
| // add a new app2 - starting |
| app2 := addAndConfirmApplicationExists(t, partitionName, defaultPartition, "app-2") |
| app2.SetState(objects.Starting.String()) |
| |
| // add a new app3 - running |
| app3 := addAndConfirmApplicationExists(t, partitionName, defaultPartition, "app-3") |
| app3.SetState(objects.Running.String()) |
| |
| // add a new app4 - completing |
| app4 := addAndConfirmApplicationExists(t, partitionName, defaultPartition, "app-4") |
| app4.SetState(objects.Completing.String()) |
| |
| // add a new app5 - rejected |
| app5 := addAndConfirmApplicationExists(t, partitionName, defaultPartition, "app-5") |
| app5.SetState(objects.Rejected.String()) |
| |
| // add a new app6 - completed |
| app6 := addAndConfirmApplicationExists(t, partitionName, defaultPartition, "app-6") |
| app6.SetState(objects.Completed.String()) |
| |
| // add a new app7 - failed |
| app7 := addAndConfirmApplicationExists(t, partitionName, defaultPartition, "app-7") |
| app7.SetState(objects.Failed.String()) |
| |
| NewWebApp(schedulerContext, nil) |
| |
| // create test nodes |
| nodeRes := resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 500, siCommon.CPU: 500}).ToProto() |
| node1ID := "node-1" |
| node1 := objects.NewNode(&si.NodeInfo{NodeID: node1ID, SchedulableResource: nodeRes}) |
| node2ID := "node-2" |
| node2 := objects.NewNode(&si.NodeInfo{NodeID: node2ID, SchedulableResource: nodeRes}) |
| |
| // create test allocations |
| resAlloc1 := resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 100, siCommon.CPU: 400}) |
| resAlloc2 := resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 200, siCommon.CPU: 300}) |
| ask1 := objects.NewAllocationAsk("alloc-1", app6.ApplicationID, resAlloc1) |
| ask2 := objects.NewAllocationAsk("alloc-2", app3.ApplicationID, resAlloc2) |
| allocs := []*objects.Allocation{objects.NewAllocation("alloc-1-uuid", node1ID, ask1)} |
| err := defaultPartition.AddNode(node1, allocs) |
| assert.NilError(t, err, "add node to partition should not have failed") |
| allocs = []*objects.Allocation{objects.NewAllocation("alloc-2-uuid", node2ID, ask2)} |
| err = defaultPartition.AddNode(node2, allocs) |
| assert.NilError(t, err, "add node to partition should not have failed") |
| |
| var req *http.Request |
| req, err = http.NewRequest("GET", "/ws/v1/partitions", strings.NewReader("")) |
| assert.NilError(t, err, "App Handler request failed") |
| resp := &MockResponseWriter{} |
| var partitionInfo []*dao.PartitionInfo |
| getPartitions(resp, req) |
| err = json.Unmarshal(resp.outputBytes, &partitionInfo) |
| assert.NilError(t, err, "failed to unmarshal applications dao response from response body: %s", string(resp.outputBytes)) |
| |
| cs := make(map[string]*dao.PartitionInfo, 2) |
| for _, d := range partitionInfo { |
| cs[d.Name] = d |
| } |
| |
| assert.Assert(t, cs["default"] != nil) |
| assert.Equal(t, cs["default"].ClusterID, "rm-123") |
| assert.Equal(t, cs["default"].Name, "default") |
| assert.Equal(t, cs["default"].NodeSortingPolicy.Type, "fair") |
| assert.Equal(t, cs["default"].NodeSortingPolicy.ResourceWeights["vcore"], 1.0) |
| assert.Equal(t, cs["default"].NodeSortingPolicy.ResourceWeights["memory"], 1.0) |
| assert.Equal(t, cs["default"].Applications["total"], 8) |
| assert.Equal(t, cs["default"].Applications[objects.New.String()], 1) |
| assert.Equal(t, cs["default"].Applications[objects.Accepted.String()], 1) |
| assert.Equal(t, cs["default"].Applications[objects.Starting.String()], 1) |
| assert.Equal(t, cs["default"].Applications[objects.Running.String()], 1) |
| assert.Equal(t, cs["default"].Applications[objects.Completing.String()], 1) |
| assert.Equal(t, cs["default"].Applications[objects.Rejected.String()], 1) |
| assert.Equal(t, cs["default"].Applications[objects.Completed.String()], 1) |
| assert.Equal(t, cs["default"].Applications[objects.Failed.String()], 1) |
| assert.DeepEqual(t, cs["default"].Capacity.Capacity, map[string]int64{"memory": 1000, "vcore": 1000}) |
| assert.DeepEqual(t, cs["default"].Capacity.UsedCapacity, map[string]int64{"memory": 300, "vcore": 700}) |
| assert.DeepEqual(t, cs["default"].Capacity.Utilization, map[string]int64{"memory": 30, "vcore": 70}) |
| assert.Equal(t, cs["default"].State, "Active") |
| |
| assert.Assert(t, cs["gpu"] != nil) |
| assert.Equal(t, cs["gpu"].ClusterID, "rm-123") |
| assert.Equal(t, cs["gpu"].Name, "gpu") |
| assert.Equal(t, cs["default"].NodeSortingPolicy.Type, "fair") |
| assert.Equal(t, cs["default"].NodeSortingPolicy.ResourceWeights["vcore"], 1.0) |
| assert.Equal(t, cs["default"].NodeSortingPolicy.ResourceWeights["memory"], 1.0) |
| assert.Equal(t, cs["gpu"].Applications["total"], 0) |
| } |
| |
| func TestCreateClusterConfig(t *testing.T) { |
| confTests := []struct { |
| content string |
| expectedResponse dao.ValidateConfResponse |
| }{ |
| { |
| content: baseConf, |
| expectedResponse: dao.ValidateConfResponse{ |
| Allowed: true, |
| Reason: "", |
| }, |
| }, |
| { |
| content: invalidConf, |
| expectedResponse: dao.ValidateConfResponse{ |
| Allowed: false, |
| Reason: "undefined policy: invalid", |
| }, |
| }, |
| } |
| |
| configs.MockSchedulerConfigByData([]byte(configDefault)) |
| var err error |
| schedulerContext, err = scheduler.NewClusterContext(rmID, policyGroup) |
| assert.NilError(t, err, "Error when load clusterInfo from config") |
| for _, test := range confTests { |
| // No err check: new request always returns correctly |
| //nolint: errcheck |
| req, _ := http.NewRequest("POST", "/ws/v1/config?dry_run=1", strings.NewReader(test.content)) |
| rr := httptest.NewRecorder() |
| mux := http.HandlerFunc(createClusterConfig) |
| handler := loggingHandler(mux, "/ws/v1/config") |
| handler.ServeHTTP(rr, req) |
| var vcr dao.ValidateConfResponse |
| err = json.Unmarshal(rr.Body.Bytes(), &vcr) |
| assert.Equal(t, http.StatusOK, rr.Result().StatusCode, "Incorrect Status code") |
| assert.NilError(t, err, "failed to unmarshal ValidateConfResponse from response body") |
| assert.Equal(t, vcr.Allowed, test.expectedResponse.Allowed, "allowed flag incorrect") |
| assert.Equal(t, vcr.Reason, test.expectedResponse.Reason, "response text not as expected") |
| } |
| |
| // When "dry_run" is not passed |
| //nolint: errcheck |
| req, err := http.NewRequest("POST", "/ws/v1/config", nil) |
| assert.NilError(t, err, "Problem in creating the request") |
| rr := httptest.NewRecorder() |
| mux := http.HandlerFunc(createClusterConfig) |
| handler := loggingHandler(mux, "/ws/v1/config") |
| handler.ServeHTTP(rr, req) |
| |
| var errInfo dao.YAPIError |
| err = json.Unmarshal(rr.Body.Bytes(), &errInfo) |
| assert.NilError(t, err, "failed to unmarshal ValidateConfResponse from response body") |
| assert.Equal(t, http.StatusBadRequest, rr.Result().StatusCode, "Incorrect Status code") |
| assert.Equal(t, errInfo.Message, "Dry run param is missing. Please check the usage documentation", "JSON error message is incorrect") |
| assert.Equal(t, errInfo.StatusCode, http.StatusBadRequest) |
| |
| // When "dry_run" value is invalid |
| //nolint: errcheck |
| req, err = http.NewRequest("POST", "/ws/v1/config?dry_run=0", nil) |
| assert.NilError(t, err, "Problem in creating the request") |
| rr = httptest.NewRecorder() |
| mux = http.HandlerFunc(createClusterConfig) |
| handler = loggingHandler(mux, "/ws/v1/config?dry_run=0") |
| handler.ServeHTTP(rr, req) |
| err = json.Unmarshal(rr.Body.Bytes(), &errInfo) |
| assert.NilError(t, err, "failed to unmarshal ValidateConfResponse from response body") |
| assert.Equal(t, http.StatusBadRequest, rr.Result().StatusCode, "Incorrect Status code") |
| assert.Equal(t, errInfo.Message, "Invalid \"dry_run\" query param. Currently, only dry_run=1 is supported. Please check the usage documentation", "JSON error message is incorrect") |
| assert.Equal(t, errInfo.StatusCode, http.StatusBadRequest) |
| } |
| |
| func TestMetricsNotEmpty(t *testing.T) { |
| req, err := http.NewRequest("GET", "/ws/v1/metrics", strings.NewReader("")) |
| assert.NilError(t, err, "Error while creating the request") |
| rr := httptest.NewRecorder() |
| mux := http.HandlerFunc(promhttp.Handler().ServeHTTP) |
| handler := loggingHandler(mux, "/ws/v1/metrics") |
| handler.ServeHTTP(rr, req) |
| assert.Assert(t, len(rr.Body.Bytes()) > 0, "Metrics response should not be empty") |
| } |
| |
| func TestGetPartitionQueuesHandler(t *testing.T) { |
| setup(t, configTwoLevelQueues, 2) |
| |
| NewWebApp(schedulerContext, nil) |
| |
| var req *http.Request |
| req, err := http.NewRequest("GET", "/ws/v1/partition/default/queues", strings.NewReader("")) |
| vars := map[string]string{ |
| "partition": partitionNameWithoutClusterID, |
| } |
| req = mux.SetURLVars(req, vars) |
| assert.NilError(t, err, "Get Queues for PartitionQueues Handler request failed") |
| resp := &MockResponseWriter{} |
| var partitionQueuesDao dao.PartitionQueueDAOInfo |
| getPartitionQueues(resp, req) |
| err = json.Unmarshal(resp.outputBytes, &partitionQueuesDao) |
| assert.NilError(t, err, "failed to unmarshal PartitionQueues dao response from response body: %s", string(resp.outputBytes)) |
| assert.Equal(t, partitionQueuesDao.Children[0].Parent, "root") |
| assert.Equal(t, partitionQueuesDao.Children[1].Parent, "root") |
| assert.Equal(t, partitionQueuesDao.Children[2].Parent, "root") |
| assert.Equal(t, len(partitionQueuesDao.Properties), 1) |
| assert.Equal(t, partitionQueuesDao.Properties["application.sort.policy"], "stateaware") |
| assert.Equal(t, len(partitionQueuesDao.TemplateInfo.Properties), 1) |
| assert.Equal(t, partitionQueuesDao.TemplateInfo.Properties["application.sort.policy"], "stateaware") |
| |
| maxResourcesConf := make(map[string]string) |
| maxResourcesConf["memory"] = "600000" |
| maxResource, err := resources.NewResourceFromConf(maxResourcesConf) |
| assert.NilError(t, err) |
| assert.DeepEqual(t, partitionQueuesDao.TemplateInfo.MaxResource, maxResource.DAOMap()) |
| |
| guaranteedResourcesConf := make(map[string]string) |
| guaranteedResourcesConf["memory"] = "400000" |
| guaranteedResources, err := resources.NewResourceFromConf(guaranteedResourcesConf) |
| assert.NilError(t, err) |
| assert.DeepEqual(t, partitionQueuesDao.TemplateInfo.GuaranteedResource, guaranteedResources.DAOMap()) |
| |
| // Partition not exists |
| req, err = http.NewRequest("GET", "/ws/v1/partition/default/queues", strings.NewReader("")) |
| vars = map[string]string{ |
| "partition": "notexists", |
| } |
| req = mux.SetURLVars(req, vars) |
| assert.NilError(t, err, "Get Queues for PartitionQueues Handler request failed") |
| resp = &MockResponseWriter{} |
| getPartitionQueues(resp, req) |
| assertPartitionExists(t, resp) |
| } |
| |
| func TestGetClusterInfo(t *testing.T) { |
| setup(t, configTwoLevelQueues, 2) |
| |
| resp := &MockResponseWriter{} |
| getClusterInfo(resp, nil) |
| var data []*dao.ClusterDAOInfo |
| err := json.Unmarshal(resp.outputBytes, &data) |
| assert.NilError(t, err) |
| assert.Equal(t, 2, len(data)) |
| |
| cs := make(map[string]*dao.ClusterDAOInfo, 2) |
| for _, d := range data { |
| cs[d.PartitionName] = d |
| } |
| |
| assert.Assert(t, cs["default"] != nil) |
| assert.Assert(t, cs["gpu"] != nil) |
| } |
| |
| func TestGetPartitionNodes(t *testing.T) { |
| partition := setup(t, configDefault, 1) |
| |
| // create test application |
| appID := "app1" |
| app := newApplication(appID, partition.Name, queueName, rmID, security.UserGroup{}) |
| err := partition.AddApplication(app) |
| assert.NilError(t, err, "add application to partition should not have failed") |
| |
| // create test nodes |
| nodeRes := resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 1000, siCommon.CPU: 1000}).ToProto() |
| node1ID := "node-1" |
| node1 := objects.NewNode(&si.NodeInfo{NodeID: node1ID, SchedulableResource: nodeRes}) |
| node2ID := "node-2" |
| node2 := objects.NewNode(&si.NodeInfo{NodeID: node2ID, SchedulableResource: nodeRes}) |
| |
| // create test allocations |
| resAlloc1 := resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 500, siCommon.CPU: 300}) |
| resAlloc2 := resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 300, siCommon.CPU: 500}) |
| ask1 := objects.NewAllocationAsk("alloc-1", appID, resAlloc1) |
| ask2 := objects.NewAllocationAsk("alloc-2", appID, resAlloc2) |
| allocs := []*objects.Allocation{objects.NewAllocation("alloc-1-uuid", node1ID, ask1)} |
| err = partition.AddNode(node1, allocs) |
| assert.NilError(t, err, "add node to partition should not have failed") |
| allocs = []*objects.Allocation{objects.NewAllocation("alloc-2-uuid", node2ID, ask2)} |
| err = partition.AddNode(node2, allocs) |
| assert.NilError(t, err, "add node to partition should not have failed") |
| |
| NewWebApp(schedulerContext, nil) |
| |
| var req *http.Request |
| req, err = http.NewRequest("GET", "/ws/v1/partition/default/nodes", strings.NewReader("")) |
| vars := map[string]string{ |
| "partition": partitionNameWithoutClusterID, |
| } |
| req = mux.SetURLVars(req, vars) |
| assert.NilError(t, err, "Get Nodes for PartitionNodes Handler request failed") |
| resp := &MockResponseWriter{} |
| var partitionNodesDao []*dao.NodeDAOInfo |
| getPartitionNodes(resp, req) |
| err = json.Unmarshal(resp.outputBytes, &partitionNodesDao) |
| assert.NilError(t, err, "failed to unmarshal PartitionNodes dao response from response body: %s", string(resp.outputBytes)) |
| assert.Equal(t, 1, len(partitionNodesDao[0].Allocations)) |
| for _, node := range partitionNodesDao { |
| assert.Equal(t, 1, len(node.Allocations)) |
| if !node.IsReserved { |
| assert.Equal(t, len(node.Reservations), 0) |
| } else { |
| assert.Check(t, len(node.Reservations) > 0, "Get wrong reservation info from node dao") |
| } |
| |
| if node.NodeID == node1ID { |
| assert.Equal(t, node.NodeID, node1ID) |
| assert.Equal(t, "alloc-1", node.Allocations[0].AllocationKey) |
| assert.Equal(t, "alloc-1-uuid", node.Allocations[0].UUID) |
| assert.DeepEqual(t, map[string]int64{"memory": 50, "vcore": 30}, node.Utilized) |
| } else { |
| assert.Equal(t, node.NodeID, node2ID) |
| assert.Equal(t, "alloc-2", node.Allocations[0].AllocationKey) |
| assert.Equal(t, "alloc-2-uuid", node.Allocations[0].UUID) |
| assert.DeepEqual(t, map[string]int64{"memory": 30, "vcore": 50}, node.Utilized) |
| } |
| } |
| |
| var req1 *http.Request |
| req1, err = http.NewRequest("GET", "/ws/v1/partition/default/nodes", strings.NewReader("")) |
| vars1 := map[string]string{ |
| "partition": "notexists", |
| } |
| req1 = mux.SetURLVars(req1, vars1) |
| assert.NilError(t, err, "Get Nodes for PartitionNodes Handler request failed") |
| resp1 := &MockResponseWriter{} |
| getPartitionNodes(resp1, req1) |
| assertPartitionExists(t, resp1) |
| } |
| |
| // addApp Add app to the given partition and assert the app count, state etc |
| func addApp(t *testing.T, id string, part *scheduler.PartitionContext, queueName string, isCompleted bool) *objects.Application { |
| initSize := len(part.GetApplications()) |
| app := newApplication(id, part.Name, queueName, rmID, security.UserGroup{}) |
| err := part.AddApplication(app) |
| assert.NilError(t, err, "Failed to add Application to Partition.") |
| assert.Equal(t, app.CurrentState(), objects.New.String()) |
| assert.Equal(t, 1+initSize, len(part.GetApplications())) |
| if isCompleted { |
| // we don't test partition, so it is fine to skip to update partition |
| app.UnSetQueue() |
| } |
| return app |
| } |
| |
| func TestGetQueueApplicationsHandler(t *testing.T) { |
| part := setup(t, configDefault, 1) |
| |
| // add two applications |
| app := addApp(t, "app-1", part, "root.default", false) |
| addApp(t, "app-2", part, "root.default", true) |
| |
| // add placeholder to test PlaceholderDAOInfo |
| tg := "tg-1" |
| res := &si.Resource{ |
| Resources: map[string]*si.Quantity{"vcore": {Value: 1}}, |
| } |
| ask := objects.NewAllocationAskFromSI(&si.AllocationAsk{ |
| ApplicationID: "app-1", |
| PartitionName: part.Name, |
| TaskGroupName: tg, |
| ResourceAsk: res, |
| Placeholder: true, |
| MaxAllocations: 1}) |
| err := app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "ask should have been added to app") |
| app.SetTimedOutPlaceholder(tg, 1) |
| |
| NewWebApp(schedulerContext, nil) |
| |
| var req *http.Request |
| req, err = http.NewRequest("GET", "/ws/v1/partition/default/queue/root.default/applications", strings.NewReader("")) |
| vars := map[string]string{ |
| "partition": partitionNameWithoutClusterID, |
| "queue": "root.default", |
| } |
| req = mux.SetURLVars(req, vars) |
| assert.NilError(t, err, "Get Queue Applications Handler request failed") |
| resp := &MockResponseWriter{} |
| var appsDao []*dao.ApplicationDAOInfo |
| getQueueApplications(resp, req) |
| err = json.Unmarshal(resp.outputBytes, &appsDao) |
| assert.NilError(t, err, "failed to unmarshal applications dao response from response body: %s", string(resp.outputBytes)) |
| assert.Equal(t, len(appsDao), 2) |
| |
| if !appsDao[0].HasReserved { |
| assert.Equal(t, len(appsDao[0].Reservations), 0) |
| } else { |
| assert.Check(t, len(appsDao[0].Reservations) > 0, "app should have at least 1 reservation") |
| } |
| |
| // check PlaceholderData |
| assert.Equal(t, len(appsDao[0].PlaceholderData), 1) |
| assert.Equal(t, appsDao[0].PlaceholderData[0].TaskGroupName, tg) |
| assert.DeepEqual(t, appsDao[0].PlaceholderData[0].MinResource, map[string]int64{"vcore": 1}) |
| assert.Equal(t, appsDao[0].PlaceholderData[0].Replaced, int64(0)) |
| assert.Equal(t, appsDao[0].PlaceholderData[0].Count, int64(1)) |
| assert.Equal(t, appsDao[0].PlaceholderData[0].TimedOut, int64(1)) |
| |
| // test nonexistent partition |
| var req1 *http.Request |
| req1, err = http.NewRequest("GET", "/ws/v1/partition/default/queue/root.default/applications", strings.NewReader("")) |
| vars1 := map[string]string{ |
| "partition": "notexists", |
| "queue": "root.default", |
| } |
| req1 = mux.SetURLVars(req1, vars1) |
| assert.NilError(t, err, "Get Queue Applications Handler request failed") |
| resp1 := &MockResponseWriter{} |
| getQueueApplications(resp1, req1) |
| assertPartitionExists(t, resp1) |
| |
| // test nonexistent queue |
| var req2 *http.Request |
| req2, err = http.NewRequest("GET", "/ws/v1/partition/default/queue/root.default/applications", strings.NewReader("")) |
| vars2 := map[string]string{ |
| "partition": partitionNameWithoutClusterID, |
| "queue": "notexists", |
| } |
| req2 = mux.SetURLVars(req2, vars2) |
| assert.NilError(t, err, "Get Queue Applications Handler request failed") |
| resp2 := &MockResponseWriter{} |
| getQueueApplications(resp2, req2) |
| assertQueueExists(t, resp2) |
| |
| // test queue without applications |
| var req3 *http.Request |
| req3, err = http.NewRequest("GET", "/ws/v1/partition/default/queue/root.noapps/applications", strings.NewReader("")) |
| vars3 := map[string]string{ |
| "partition": partitionNameWithoutClusterID, |
| "queue": "root.noapps", |
| } |
| req3 = mux.SetURLVars(req3, vars3) |
| assert.NilError(t, err, "Get Queue Applications Handler request failed") |
| resp3 := &MockResponseWriter{} |
| var appsDao3 []*dao.ApplicationDAOInfo |
| getQueueApplications(resp3, req3) |
| err = json.Unmarshal(resp3.outputBytes, &appsDao3) |
| assert.NilError(t, err, "failed to unmarshal applications dao response from response body: %s", string(resp.outputBytes)) |
| assert.Equal(t, len(appsDao3), 0) |
| } |
| |
| func TestGetApplicationHandler(t *testing.T) { |
| part := setup(t, configDefault, 1) |
| |
| // add 1 application |
| app := addApp(t, "app-1", part, "root.default", false) |
| res := &si.Resource{ |
| Resources: map[string]*si.Quantity{"vcore": {Value: 1}}, |
| } |
| ask := objects.NewAllocationAskFromSI(&si.AllocationAsk{ |
| ApplicationID: "app-1", |
| PartitionName: part.Name, |
| ResourceAsk: res, |
| MaxAllocations: 1}) |
| err := app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "ask should have been added to app") |
| |
| NewWebApp(schedulerContext, nil) |
| |
| var req *http.Request |
| req, err = http.NewRequest("GET", "/ws/v1/partition/default/queue/root.default/application/app-1", strings.NewReader("")) |
| vars := map[string]string{ |
| "partition": partitionNameWithoutClusterID, |
| "queue": "root.default", |
| "application": "app-1", |
| } |
| req = mux.SetURLVars(req, vars) |
| assert.NilError(t, err, "Get Application Handler request failed") |
| resp := &MockResponseWriter{} |
| var appsDao *dao.ApplicationDAOInfo |
| getApplication(resp, req) |
| err = json.Unmarshal(resp.outputBytes, &appsDao) |
| assert.NilError(t, err, "failed to unmarshal applications dao response from response body: %s", string(resp.outputBytes)) |
| |
| if !appsDao.HasReserved { |
| assert.Equal(t, len(appsDao.Reservations), 0) |
| } else { |
| assert.Check(t, len(appsDao.Reservations) > 0, "app should have at least 1 reservation") |
| } |
| |
| // test nonexistent partition |
| var req1 *http.Request |
| req1, err = http.NewRequest("GET", "/ws/v1/partition/default/queue/root.default/application/app-1", strings.NewReader("")) |
| vars1 := map[string]string{ |
| "partition": "notexists", |
| "queue": "root.default", |
| "application": "app-1", |
| } |
| req1 = mux.SetURLVars(req1, vars1) |
| assert.NilError(t, err, "Get Application Handler request failed") |
| resp1 := &MockResponseWriter{} |
| getApplication(resp1, req1) |
| assertPartitionExists(t, resp1) |
| |
| // test nonexistent queue |
| var req2 *http.Request |
| req2, err = http.NewRequest("GET", "/ws/v1/partition/default/queue/root.default/application/app-1", strings.NewReader("")) |
| vars2 := map[string]string{ |
| "partition": partitionNameWithoutClusterID, |
| "queue": "notexists", |
| "application": "app-1", |
| } |
| req2 = mux.SetURLVars(req2, vars2) |
| assert.NilError(t, err, "Get Application Handler request failed") |
| resp2 := &MockResponseWriter{} |
| getApplication(resp2, req2) |
| assertQueueExists(t, resp2) |
| |
| // test nonexistent application |
| var req3 *http.Request |
| req3, err = http.NewRequest("GET", "/ws/v1/partition/default/queue/root.noapps/application/app-1", strings.NewReader("")) |
| vars3 := map[string]string{ |
| "partition": partitionNameWithoutClusterID, |
| "queue": "root.noapps", |
| "application": "app-1", |
| } |
| req3 = mux.SetURLVars(req3, vars3) |
| assert.NilError(t, err, "Get Application Handler request failed") |
| resp3 := &MockResponseWriter{} |
| getApplication(resp3, req3) |
| assertApplicationExists(t, resp3) |
| } |
| |
| func assertPartitionExists(t *testing.T, resp *MockResponseWriter) { |
| var errInfo dao.YAPIError |
| err := json.Unmarshal(resp.outputBytes, &errInfo) |
| assert.NilError(t, err, "failed to unmarshal applications dao response from response body") |
| assert.Equal(t, http.StatusBadRequest, resp.statusCode, "Incorrect Status code") |
| assert.Equal(t, errInfo.Message, PartitionDoesNotExists, "JSON error message is incorrect") |
| assert.Equal(t, errInfo.StatusCode, http.StatusBadRequest) |
| } |
| |
| func assertQueueExists(t *testing.T, resp *MockResponseWriter) { |
| var errInfo dao.YAPIError |
| err := json.Unmarshal(resp.outputBytes, &errInfo) |
| assert.NilError(t, err, "failed to unmarshal applications dao response from response body") |
| assert.Equal(t, http.StatusBadRequest, resp.statusCode, "Incorrect Status code") |
| assert.Equal(t, errInfo.Message, QueueDoesNotExists, "JSON error message is incorrect") |
| assert.Equal(t, errInfo.StatusCode, http.StatusBadRequest) |
| } |
| |
| func assertApplicationExists(t *testing.T, resp *MockResponseWriter) { |
| var errInfo dao.YAPIError |
| err := json.Unmarshal(resp.outputBytes, &errInfo) |
| assert.NilError(t, err, "failed to unmarshal applications dao response from response body") |
| assert.Equal(t, http.StatusBadRequest, resp.statusCode, "Incorrect Status code") |
| assert.Equal(t, errInfo.Message, "Application not found", "JSON error message is incorrect") |
| assert.Equal(t, errInfo.StatusCode, http.StatusBadRequest) |
| } |
| |
| func TestValidateQueue(t *testing.T) { |
| err := validateQueue("root.test.test123") |
| assert.NilError(t, err, "Queue path is correct but stil throwing error.") |
| |
| invalidQueuePath := "root.test.test123@" |
| invalidQueueName := "test123@" |
| err1 := validateQueue(invalidQueuePath) |
| assert.Error(t, err1, "problem in queue query parameter parsing as queue param "+invalidQueuePath+" contains invalid queue name "+invalidQueueName+". Queue name must only have alphanumeric characters, - or _, and be no longer than 64 characters") |
| |
| err2 := validateQueue("root") |
| assert.NilError(t, err2, "Queue path is correct but stil throwing error.") |
| } |
| |
| func TestFullStateDumpPath(t *testing.T) { |
| schedulerContext = prepareSchedulerContext(t, false) |
| |
| partitionContext := schedulerContext.GetPartitionMapClone() |
| context := partitionContext[normalizedPartitionName] |
| app := newApplication("appID", normalizedPartitionName, "root.default", rmID, security.UserGroup{}) |
| err := context.AddApplication(app) |
| assert.NilError(t, err, "failed to add Application to partition") |
| |
| imHistory = history.NewInternalMetricsHistory(5) |
| req, err2 := http.NewRequest("GET", "/ws/v1/getfullstatedump", strings.NewReader("")) |
| assert.NilError(t, err2) |
| req = mux.SetURLVars(req, make(map[string]string)) |
| resp := &MockResponseWriter{} |
| |
| getFullStateDump(resp, req) |
| statusCode := resp.statusCode |
| assert.Check(t, statusCode != http.StatusInternalServerError, "response status code") |
| var aggregated AggregatedStateInfo |
| err = json.Unmarshal(resp.outputBytes, &aggregated) |
| assert.NilError(t, err) |
| verifyStateDumpJSON(t, &aggregated) |
| } |
| |
| func TestPeriodicStateDumpDefaultPath(t *testing.T) { |
| schedulerContext = prepareSchedulerContext(t, false) |
| defer deleteStateDumpFile(t, schedulerContext) |
| |
| partitionContext := schedulerContext.GetPartitionMapClone() |
| context := partitionContext[normalizedPartitionName] |
| app := newApplication("appID", normalizedPartitionName, "root.default", rmID, security.UserGroup{}) |
| err := context.AddApplication(app) |
| assert.NilError(t, err, "failed to add Application to partition") |
| |
| imHistory = history.NewInternalMetricsHistory(5) |
| file, err := os.OpenFile(defaultStateDumpFilePath, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0666) |
| assert.NilError(t, err, "failed to open state dump file") |
| |
| err = doStateDump(file, true) |
| assert.NilError(t, err) |
| fi, err := os.Stat(defaultStateDumpFilePath) |
| assert.NilError(t, err) |
| assert.Assert(t, fi.Size() > 0, "json response is empty") |
| var aggregated AggregatedStateInfo |
| receivedBytes, err := os.ReadFile(defaultStateDumpFilePath) |
| assert.NilError(t, err) |
| err = json.Unmarshal(receivedBytes, &aggregated) |
| assert.NilError(t, err) |
| verifyStateDumpJSON(t, &aggregated) |
| } |
| |
| func TestPeriodicStateDumpNonDefaultPath(t *testing.T) { |
| stateDumpFilePath := "tmp/non-default-yunikorn-state.txt" |
| defer deleteStateDumpDir(t) |
| schedulerContext = prepareSchedulerContext(t, true) |
| defer deleteStateDumpFile(t, schedulerContext) |
| |
| partitionContext := schedulerContext.GetPartitionMapClone() |
| context := partitionContext[normalizedPartitionName] |
| app := newApplication("appID", normalizedPartitionName, "root.default", rmID, security.UserGroup{}) |
| err := context.AddApplication(app) |
| assert.NilError(t, err, "failed to add Application to partition") |
| |
| imHistory = history.NewInternalMetricsHistory(5) |
| file, err := os.OpenFile(stateDumpFilePath, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0666) |
| assert.NilError(t, err, "failed to open state dump file") |
| |
| err = doStateDump(file, true) |
| assert.NilError(t, err) |
| fi, err := os.Stat(stateDumpFilePath) |
| assert.NilError(t, err) |
| assert.Assert(t, fi.Size() > 0, "json response is empty") |
| var aggregated AggregatedStateInfo |
| receivedBytes, err := os.ReadFile(stateDumpFilePath) |
| assert.NilError(t, err) |
| err = json.Unmarshal(receivedBytes, &aggregated) |
| assert.NilError(t, err) |
| verifyStateDumpJSON(t, &aggregated) |
| } |
| |
| func TestEnableDisablePeriodicStateDump(t *testing.T) { |
| schedulerContext = prepareSchedulerContext(t, false) |
| defer deleteStateDumpFile(t, schedulerContext) |
| defer terminateGoroutine() |
| |
| imHistory = history.NewInternalMetricsHistory(5) |
| req, err := http.NewRequest("PUT", "/ws/v1/periodicstatedump", strings.NewReader("")) |
| assert.NilError(t, err) |
| vars := map[string]string{ |
| "periodSeconds": "3", |
| "switch": "enable", |
| } |
| req = mux.SetURLVars(req, vars) |
| resp := &MockResponseWriter{} |
| |
| // enable state dump, check file contents |
| handlePeriodicStateDump(resp, req) |
| statusCode := resp.statusCode |
| assert.Equal(t, statusCode, 0, "response status code") |
| |
| waitForStateDumpFile(t) |
| fileContents, err2 := os.ReadFile(defaultStateDumpFilePath) |
| assert.NilError(t, err2) |
| var aggregated AggregatedStateInfo |
| err3 := json.Unmarshal(fileContents, &aggregated) |
| assert.NilError(t, err3) |
| verifyStateDumpJSON(t, &aggregated) |
| |
| // disable |
| req, err = http.NewRequest("PUT", "/ws/v1/periodicstatedump", strings.NewReader("")) |
| assert.NilError(t, err) |
| vars = map[string]string{ |
| "switch": "disable", |
| } |
| req = mux.SetURLVars(req, vars) |
| resp = &MockResponseWriter{} |
| handlePeriodicStateDump(resp, req) |
| statusCode = resp.statusCode |
| assert.Equal(t, statusCode, 0, "response status code") |
| } |
| |
| func TestTryEnableStateDumpTwice(t *testing.T) { |
| defer deleteStateDumpFile(t, schedulerContext) |
| defer terminateGoroutine() |
| |
| req, err := http.NewRequest("PUT", "/ws/v1/periodicstatedump", strings.NewReader("")) |
| assert.NilError(t, err) |
| vars := map[string]string{ |
| "switch": "enable", |
| } |
| req = mux.SetURLVars(req, vars) |
| resp := &MockResponseWriter{} |
| |
| // first call - succeeds |
| handlePeriodicStateDump(resp, req) |
| statusCode := resp.statusCode |
| assert.Equal(t, statusCode, 0, "response status code") |
| |
| // second call - expected to fail |
| handlePeriodicStateDump(resp, req) |
| statusCode = resp.statusCode |
| assert.Equal(t, statusCode, http.StatusInternalServerError, "response status code") |
| } |
| |
| func TestTryDisableNotRunningStateDump(t *testing.T) { |
| req, err := http.NewRequest("PUT", "/ws/v1/periodicstatedump", strings.NewReader("")) |
| assert.NilError(t, err) |
| vars := map[string]string{ |
| "switch": "disable", |
| } |
| req = mux.SetURLVars(req, vars) |
| resp := &MockResponseWriter{} |
| |
| handlePeriodicStateDump(resp, req) |
| |
| statusCode := resp.statusCode |
| assert.Equal(t, statusCode, http.StatusInternalServerError, "response status code") |
| } |
| |
| func TestIllegalStateDumpRequests(t *testing.T) { |
| // missing |
| req, err := http.NewRequest("PUT", "/ws/v1/periodicstatedump", strings.NewReader("")) |
| assert.NilError(t, err) |
| resp := &MockResponseWriter{} |
| handlePeriodicStateDump(resp, req) |
| statusCode := resp.statusCode |
| assert.Equal(t, statusCode, http.StatusBadRequest, "response status code") |
| |
| // illegal |
| req, err = http.NewRequest("PUT", "/ws/v1/periodicstatedump", strings.NewReader("")) |
| assert.NilError(t, err) |
| vars := map[string]string{ |
| "switch": "illegal", |
| } |
| req = mux.SetURLVars(req, vars) |
| resp = &MockResponseWriter{} |
| handlePeriodicStateDump(resp, req) |
| statusCode = resp.statusCode |
| assert.Equal(t, statusCode, http.StatusBadRequest, "response status code") |
| } |
| |
| func prepareSchedulerContext(t *testing.T, stateDumpConf bool) *scheduler.ClusterContext { |
| if !stateDumpConf { |
| configs.MockSchedulerConfigByData([]byte(configDefault)) |
| } else { |
| configs.MockSchedulerConfigByData([]byte(configStateDumpFilePath)) |
| } |
| var err error |
| schedulerContext, err = scheduler.NewClusterContext(rmID, policyGroup) |
| assert.NilError(t, err, "Error when load clusterInfo from config") |
| assert.Equal(t, 1, len(schedulerContext.GetPartitionMapClone())) |
| |
| return schedulerContext |
| } |
| |
| func waitForStateDumpFile(t *testing.T) { |
| var attempts int |
| for { |
| info, err := os.Stat(defaultStateDumpFilePath) |
| // tolerate only "file not found" errors |
| if err != nil && !os.IsNotExist(err) { |
| t.Fatal(err) |
| } |
| |
| if info != nil && info.Size() > 0 { |
| break |
| } |
| |
| if attempts++; attempts > 10 { |
| t.Fatal("state dump file has not been created or still empty") |
| } |
| |
| time.Sleep(1 * time.Second) |
| } |
| } |
| |
| func deleteStateDumpFile(t *testing.T, cc *scheduler.ClusterContext) { |
| stateDumpFilePath := getStateDumpFilePath(cc) |
| if err := os.Remove(stateDumpFilePath); err != nil { |
| t.Fatal(err) |
| } |
| } |
| |
| func deleteStateDumpDir(t *testing.T) { |
| if err := os.Remove("tmp"); err != nil { |
| t.Fatal(err) |
| } |
| } |
| |
| func terminateGoroutine() { |
| if !isAbortClosed() { |
| abort <- struct{}{} |
| close(abort) |
| periodicStateDump = false |
| } |
| } |
| |
| func isAbortClosed() bool { |
| select { |
| case <-abort: |
| return true |
| default: |
| } |
| return false |
| } |
| |
| func verifyStateDumpJSON(t *testing.T, aggregated *AggregatedStateInfo) { |
| assert.Check(t, aggregated.Timestamp != 0) |
| assert.Check(t, len(aggregated.Partitions) > 0) |
| assert.Check(t, len(aggregated.Nodes) > 0) |
| assert.Check(t, len(aggregated.ClusterInfo) > 0) |
| assert.Check(t, len(aggregated.Queues) > 0) |
| assert.Check(t, len(aggregated.LogLevel) > 0) |
| } |