blob: 04c70c507ff78b424a6d1e109149e9d7ea8cf8c5 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package webservice
import (
"encoding/json"
"fmt"
"io"
"math"
"net/http"
"runtime"
"sort"
"strconv"
"strings"
"time"
"github.com/julienschmidt/httprouter"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
"gopkg.in/yaml.v3"
"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/events"
"github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/log"
metrics2 "github.com/apache/yunikorn-core/pkg/metrics"
"github.com/apache/yunikorn-core/pkg/metrics/history"
"github.com/apache/yunikorn-core/pkg/plugins"
"github.com/apache/yunikorn-core/pkg/scheduler"
"github.com/apache/yunikorn-core/pkg/scheduler/objects"
"github.com/apache/yunikorn-core/pkg/scheduler/ugm"
"github.com/apache/yunikorn-core/pkg/webservice/dao"
)
const (
PartitionDoesNotExists = "Partition not found"
MissingParamsName = "Missing parameters"
QueueDoesNotExists = "Queue not found"
UserDoesNotExists = "User not found"
GroupDoesNotExists = "Group not found"
UserNameMissing = "User name is missing"
GroupNameMissing = "Group name is missing"
ApplicationDoesNotExists = "Application not found"
NodeDoesNotExists = "Node not found"
)
var allowedActiveStatusMsg string
var allowedAppActiveStatuses map[string]bool
var streamingLimiter *StreamingLimiter
func init() {
allowedAppActiveStatuses = make(map[string]bool)
allowedAppActiveStatuses["new"] = true
allowedAppActiveStatuses["accepted"] = true
allowedAppActiveStatuses["running"] = true
allowedAppActiveStatuses["completing"] = true
allowedAppActiveStatuses["failing"] = true
allowedAppActiveStatuses["resuming"] = true
var activeStatuses []string
for k := range allowedAppActiveStatuses {
activeStatuses = append(activeStatuses, k)
}
allowedActiveStatusMsg = fmt.Sprintf("Only following active statuses are allowed: %s", strings.Join(activeStatuses, ","))
streamingLimiter = NewStreamingLimiter()
}
func getStackInfo(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
var stack = func() []byte {
buf := make([]byte, 1024)
for {
n := runtime.Stack(buf, true)
if n < len(buf) {
return buf[:n]
}
buf = make([]byte, 2*len(buf))
}
}
if _, err := w.Write(stack()); err != nil {
log.Log(log.REST).Error("GetStackInfo error", zap.Error(err))
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func getClusterInfo(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
lists := schedulerContext.GetPartitionMapClone()
clustersInfo := getClusterDAO(lists)
if err := json.NewEncoder(w).Encode(clustersInfo); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func validateQueue(queuePath string) error {
if queuePath != "" {
queueNameArr := strings.Split(queuePath, ".")
for _, name := range queueNameArr {
if !configs.QueueNameRegExp.MatchString(name) && name != common.RecoveryQueue {
return fmt.Errorf("problem in queue query parameter parsing as queue param "+
"%s contains invalid queue name %s. Queue name must only have "+
"alphanumeric characters, - or _, and be no longer than 64 characters except the recovery queue root.@recovery@", queuePath, name)
}
}
}
return nil
}
func validateConf(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
requestBytes, err := io.ReadAll(r.Body)
if err == nil {
_, err = configs.LoadSchedulerConfigFromByteArray(requestBytes)
}
var result dao.ValidateConfResponse
if err != nil {
result.Allowed = false
result.Reason = err.Error()
} else {
result.Allowed = true
}
if err = json.NewEncoder(w).Encode(result); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func writeHeaders(w http.ResponseWriter) {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Credentials", "true")
w.Header().Set("Access-Control-Allow-Methods", "GET,POST,HEAD,OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "X-Requested-With,Content-Type,Accept,Origin")
}
func buildJSONErrorResponse(w http.ResponseWriter, detail string, code int) {
w.WriteHeader(code)
errorInfo := dao.NewYAPIError(nil, code, detail)
if jsonErr := json.NewEncoder(w).Encode(errorInfo); jsonErr != nil {
log.Log(log.REST).Error(fmt.Sprintf("Problem in sending error response in JSON format. Error response: %s", detail))
}
}
func getClusterJSON(partition *scheduler.PartitionContext) *dao.ClusterDAOInfo {
clusterInfo := &dao.ClusterDAOInfo{}
clusterInfo.StartTime = schedulerContext.GetStartTime().UnixNano()
rmInfo := schedulerContext.GetRMInfoMapClone()
clusterInfo.RMBuildInformation = getRMBuildInformation(rmInfo)
clusterInfo.PartitionName = common.GetPartitionNameWithoutClusterID(partition.Name)
clusterInfo.ClusterName = "kubernetes"
return clusterInfo
}
func getClusterUtilJSON(partition *scheduler.PartitionContext) []*dao.ClusterUtilDAOInfo {
var utils []*dao.ClusterUtilDAOInfo
var getResource = true
total := partition.GetTotalPartitionResource()
if resources.IsZero(total) {
getResource = false
}
used := partition.GetAllocatedResource()
if len(used.Resources) == 0 {
getResource = false
}
if getResource {
percent := resources.CalculateAbsUsedCapacity(total, used)
for name, value := range percent.Resources {
utilization := &dao.ClusterUtilDAOInfo{
ResourceType: name,
Total: int64(total.Resources[name]),
Used: int64(used.Resources[name]),
Usage: fmt.Sprintf("%d%%", int64(value)),
}
utils = append(utils, utilization)
}
} else if !getResource {
utilization := &dao.ClusterUtilDAOInfo{
ResourceType: "N/A",
Total: int64(-1),
Used: int64(-1),
Usage: "N/A",
}
utils = append(utils, utilization)
}
return utils
}
func getAllocationDAO(alloc *objects.Allocation) *dao.AllocationDAOInfo {
var requestTime int64
if alloc.IsPlaceholderUsed() {
requestTime = alloc.GetPlaceholderCreateTime().UnixNano()
} else {
requestTime = alloc.GetAsk().GetCreateTime().UnixNano()
}
allocTime := alloc.GetCreateTime().UnixNano()
allocDAO := &dao.AllocationDAOInfo{
AllocationKey: alloc.GetAllocationKey(),
AllocationTags: alloc.GetTagsClone(),
RequestTime: requestTime,
AllocationTime: allocTime,
AllocationDelay: allocTime - requestTime,
ResourcePerAlloc: alloc.GetAllocatedResource().DAOMap(),
PlaceholderUsed: alloc.IsPlaceholderUsed(),
Placeholder: alloc.IsPlaceholder(),
TaskGroupName: alloc.GetTaskGroup(),
Priority: strconv.Itoa(int(alloc.GetPriority())),
NodeID: alloc.GetNodeID(),
ApplicationID: alloc.GetApplicationID(),
Partition: alloc.GetPartitionName(),
Preempted: alloc.IsPreempted(),
}
return allocDAO
}
func getAllocationsDAO(allocations []*objects.Allocation) []*dao.AllocationDAOInfo {
allocsDAO := make([]*dao.AllocationDAOInfo, 0, len(allocations))
for _, alloc := range allocations {
allocsDAO = append(allocsDAO, getAllocationDAO(alloc))
}
return allocsDAO
}
func getPlaceholderDAO(ph *objects.PlaceholderData) *dao.PlaceholderDAOInfo {
phDAO := &dao.PlaceholderDAOInfo{
TaskGroupName: ph.TaskGroupName,
Count: ph.Count,
MinResource: ph.MinResource.DAOMap(),
Replaced: ph.Replaced,
TimedOut: ph.TimedOut,
}
return phDAO
}
func getPlaceholdersDAO(entries []*objects.PlaceholderData) []*dao.PlaceholderDAOInfo {
phsDAO := make([]*dao.PlaceholderDAOInfo, 0, len(entries))
for _, entry := range entries {
phsDAO = append(phsDAO, getPlaceholderDAO(entry))
}
return phsDAO
}
func getStateDAO(entry *objects.StateLogEntry) *dao.StateDAOInfo {
state := &dao.StateDAOInfo{
Time: entry.Time.UnixNano(),
ApplicationState: entry.ApplicationState,
}
return state
}
func getStatesDAO(entries []*objects.StateLogEntry) []*dao.StateDAOInfo {
statesDAO := make([]*dao.StateDAOInfo, 0, len(entries))
for _, entry := range entries {
statesDAO = append(statesDAO, getStateDAO(entry))
}
return statesDAO
}
func getApplicationDAO(app *objects.Application) *dao.ApplicationDAOInfo {
if app == nil {
return &dao.ApplicationDAOInfo{}
}
return &dao.ApplicationDAOInfo{
ApplicationID: app.ApplicationID,
UsedResource: app.GetAllocatedResource().DAOMap(),
MaxUsedResource: app.GetMaxAllocatedResource().DAOMap(),
PendingResource: app.GetPendingResource().DAOMap(),
Partition: common.GetPartitionNameWithoutClusterID(app.Partition),
QueueName: app.GetQueuePath(),
SubmissionTime: app.SubmissionTime.UnixNano(),
FinishedTime: common.ZeroTimeInUnixNano(app.FinishedTime()),
Requests: getAllocationAsksDAO(app.GetAllRequests()),
Allocations: getAllocationsDAO(app.GetAllAllocations()),
State: app.CurrentState(),
User: app.GetUser().User,
Groups: app.GetUser().Groups,
RejectedMessage: app.GetRejectedMessage(),
PlaceholderData: getPlaceholdersDAO(app.GetAllPlaceholderData()),
StateLog: getStatesDAO(app.GetStateLog()),
HasReserved: app.HasReserved(),
Reservations: app.GetReservations(),
MaxRequestPriority: app.GetAskMaxPriority(),
}
}
func getAllocationLogsDAO(logEntries []*objects.AllocationLogEntry) []*dao.AllocationAskLogDAOInfo {
logsDAO := make([]*dao.AllocationAskLogDAOInfo, len(logEntries))
sort.SliceStable(logEntries, func(i, j int) bool {
return logEntries[i].LastOccurrence.Before(logEntries[j].LastOccurrence)
})
for i, entry := range logEntries {
logsDAO[i] = &dao.AllocationAskLogDAOInfo{
Message: entry.Message,
LastOccurrence: entry.LastOccurrence.UnixNano(),
Count: entry.Count,
}
}
return logsDAO
}
func getAllocationAskDAO(ask *objects.AllocationAsk) *dao.AllocationAskDAOInfo {
return &dao.AllocationAskDAOInfo{
AllocationKey: ask.GetAllocationKey(),
AllocationTags: ask.GetTagsClone(),
RequestTime: ask.GetCreateTime().UnixNano(),
ResourcePerAlloc: ask.GetAllocatedResource().DAOMap(),
Priority: strconv.Itoa(int(ask.GetPriority())),
RequiredNodeID: ask.GetRequiredNode(),
ApplicationID: ask.GetApplicationID(),
Partition: common.GetPartitionNameWithoutClusterID(ask.GetPartitionName()),
Placeholder: ask.IsPlaceholder(),
TaskGroupName: ask.GetTaskGroup(),
AllocationLog: getAllocationLogsDAO(ask.GetAllocationLog()),
TriggeredPreemption: ask.HasTriggeredPreemption(),
Originator: ask.IsOriginator(),
SchedulingAttempted: ask.IsSchedulingAttempted(),
TriggeredScaleUp: ask.HasTriggeredScaleUp(),
}
}
func getAllocationAsksDAO(asks []*objects.AllocationAsk) []*dao.AllocationAskDAOInfo {
asksDAO := make([]*dao.AllocationAskDAOInfo, 0, len(asks))
for _, ask := range asks {
if !ask.IsAllocated() {
asksDAO = append(asksDAO, getAllocationAskDAO(ask))
}
}
return asksDAO
}
func getNodeDAO(node *objects.Node) *dao.NodeDAOInfo {
return &dao.NodeDAOInfo{
NodeID: node.NodeID,
HostName: node.Hostname,
RackName: node.Rackname,
Attributes: node.GetAttributes(),
Capacity: node.GetCapacity().DAOMap(),
Occupied: node.GetOccupiedResource().DAOMap(),
Allocated: node.GetAllocatedResource().DAOMap(),
Available: node.GetAvailableResource().DAOMap(),
Utilized: node.GetUtilizedResource().DAOMap(),
Allocations: getAllocationsDAO(node.GetAllAllocations()),
Schedulable: node.IsSchedulable(),
IsReserved: node.IsReserved(),
Reservations: node.GetReservationKeys(),
}
}
func getNodesDAO(entries []*objects.Node) []*dao.NodeDAOInfo {
nodesDAO := make([]*dao.NodeDAOInfo, 0, len(entries))
for _, entry := range entries {
nodesDAO = append(nodesDAO, getNodeDAO(entry))
}
return nodesDAO
}
// getNodeUtilisation loads the node utilisation based on the dominant resource used
// for the default partition. Dominant resource is defined as the highest utilised resource
// type on the root queue based on the registered resources.
// Only check the default partition
// Deprecated - To be removed in next major release. Replaced with getNodesUtilisations
func getNodeUtilisation(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
partitionContext := schedulerContext.GetPartitionWithoutClusterID(configs.DefaultPartition)
if partitionContext == nil {
buildJSONErrorResponse(w, PartitionDoesNotExists, http.StatusInternalServerError)
return
}
// calculate the dominant resource based on root queue usage and size
rootQ := partitionContext.GetQueue(configs.RootQueue)
rootMax := rootQ.GetMaxResource()
// if no nodes have been registered return an empty object
nodesDao := &dao.NodesUtilDAOInfo{}
if !resources.IsZero(rootMax) {
// if nothing is used we get an empty dominant resource and return an empty object
rootUsed := rootQ.GetAllocatedResource()
dominant := rootUsed.DominantResourceType(rootMax)
nodesDao = getNodesUtilJSON(partitionContext, dominant)
}
if err := json.NewEncoder(w).Encode(nodesDao); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
// getNodesUtilJSON loads the nodes utilisation for a partition for a specific resource type.
// Deprecated - To be removed in next major release. Replaced with getPartitionNodesUtilJSON
func getNodesUtilJSON(partition *scheduler.PartitionContext, name string) *dao.NodesUtilDAOInfo {
mapResult := make([]int, 10)
mapName := make([][]string, 10)
var v float64
var nodeUtil []*dao.NodeUtilDAOInfo
var idx int
for _, node := range partition.GetNodes() {
// check resource exist or not: only count if node advertises the resource
total := node.GetCapacity()
if _, ok := total.Resources[name]; !ok {
continue
}
resourceAllocated := node.GetAllocatedResource()
// if resource exist in node, record the bucket it should go into,
// otherwise none is used, and it should end up in the 0 bucket
if _, ok := resourceAllocated.Resources[name]; ok {
v = float64(resources.CalculateAbsUsedCapacity(total, resourceAllocated).Resources[name])
idx = int(math.Dim(math.Ceil(v/10), 1))
} else {
idx = 0
}
mapResult[idx]++
mapName[idx] = append(mapName[idx], node.NodeID)
}
// put number of nodes and node name to different buckets
for k := 0; k < 10; k++ {
util := &dao.NodeUtilDAOInfo{
BucketName: fmt.Sprintf("%d", k*10) + "-" + fmt.Sprintf("%d", (k+1)*10) + "%",
NumOfNodes: int64(mapResult[k]),
NodeNames: mapName[k],
}
nodeUtil = append(nodeUtil, util)
}
return &dao.NodesUtilDAOInfo{
ResourceType: name,
NodesUtil: nodeUtil,
}
}
func getNodeUtilisations(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
var result []*dao.PartitionNodesUtilDAOInfo
for _, part := range schedulerContext.GetPartitionMapClone() {
result = append(result, getPartitionNodesUtilJSON(part))
}
if err := json.NewEncoder(w).Encode(result); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
// getPartitionNodesUtilJSON retrieves the utilization of all resource types for nodes within a specific partition.
func getPartitionNodesUtilJSON(partition *scheduler.PartitionContext) *dao.PartitionNodesUtilDAOInfo {
type UtilizationBucket struct {
NodeCount []int // 10 buckets, each bucket contains number of nodes
NodeList [][]string // 10 buckets, each bucket contains node name list
}
resourceBuckets := make(map[string]*UtilizationBucket) // key is resource type, value is UtilizationBucket
// put nodes to buckets
for _, node := range partition.GetNodes() {
capacity := node.GetCapacity()
resourceAllocated := node.GetAllocatedResource()
absUsedCapacity := resources.CalculateAbsUsedCapacity(capacity, resourceAllocated)
// append to bucket based on resource type, only count if node advertises the resource
for resourceType := range capacity.Resources {
idx := 0
if absValue, ok := absUsedCapacity.Resources[resourceType]; ok {
v := float64(absValue)
idx = int(math.Dim(math.Ceil(v/10), 1))
}
// create resource bucket if not exist
if _, ok := resourceBuckets[resourceType]; !ok {
resourceBuckets[resourceType] = &UtilizationBucket{
NodeCount: make([]int, 10),
NodeList: make([][]string, 10),
}
}
resourceBuckets[resourceType].NodeCount[idx]++
resourceBuckets[resourceType].NodeList[idx] = append(resourceBuckets[resourceType].NodeList[idx], node.NodeID)
}
}
// build result
var nodesUtilList []*dao.NodesUtilDAOInfo
for resourceType, bucket := range resourceBuckets {
var nodesUtil []*dao.NodeUtilDAOInfo
for k := 0; k < 10; k++ {
util := &dao.NodeUtilDAOInfo{
BucketName: fmt.Sprintf("%d", k*10) + "-" + fmt.Sprintf("%d", (k+1)*10) + "%",
NumOfNodes: int64(bucket.NodeCount[k]),
NodeNames: bucket.NodeList[k],
}
nodesUtil = append(nodesUtil, util)
}
nodeUtilization := &dao.NodesUtilDAOInfo{
ResourceType: resourceType,
NodesUtil: nodesUtil,
}
nodesUtilList = append(nodesUtilList, nodeUtilization)
}
return &dao.PartitionNodesUtilDAOInfo{
ClusterID: partition.RmID,
Partition: common.GetPartitionNameWithoutClusterID(partition.Name),
NodesUtilList: nodesUtilList,
}
}
func getApplicationHistory(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
// There is nothing to return but we did not really encounter a problem
if imHistory == nil {
buildJSONErrorResponse(w, "Internal metrics collection is not enabled.", http.StatusInternalServerError)
return
}
// get a copy of the records: if the array contains nil values they will always be at the
// start and we cannot shortcut the loop using a break, we must finish iterating
records := imHistory.GetRecords()
result := getAppHistoryDAO(records)
if err := json.NewEncoder(w).Encode(result); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func getContainerHistory(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
// There is nothing to return but we did not really encounter a problem
if imHistory == nil {
buildJSONErrorResponse(w, "Internal metrics collection is not enabled.", http.StatusInternalServerError)
return
}
// get a copy of the records: if the array contains nil values they will always be at the
// start and we cannot shortcut the loop using a break, we must finish iterating
records := imHistory.GetRecords()
result := getContainerHistoryDAO(records)
if err := json.NewEncoder(w).Encode(result); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func getClusterConfig(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
var marshalledConf []byte
var err error
conf := getClusterConfigDAO()
// check if we have a request for json output
if r.Header.Get("Accept") == "application/json" {
marshalledConf, err = json.Marshal(&conf)
} else {
w.Header().Set("Content-Type", "application/x-yaml; charset=UTF-8")
marshalledConf, err = yaml.Marshal(&conf)
}
if err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
return
}
if _, err = w.Write(marshalledConf); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func getClusterConfigDAO() *dao.ConfigDAOInfo {
// merge core config with extra config
conf := dao.ConfigDAOInfo{
SchedulerConfig: configs.ConfigContext.Get(schedulerContext.GetPolicyGroup()),
Extra: configs.GetConfigMap(),
DeadlockDetectionEnabled: locking.IsTrackingEnabled(),
DeadlockTimeoutSeconds: locking.GetDeadlockTimeoutSeconds(),
}
return &conf
}
func checkHealthStatus(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
// Fetch last healthCheck result
result := schedulerContext.GetLastHealthCheckResult()
if result != nil {
if !result.Healthy {
log.Log(log.SchedHealth).Error("Scheduler is not healthy", zap.Any("health check info", *result))
if err := json.NewEncoder(w).Encode(result); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
} else {
log.Log(log.SchedHealth).Info("Scheduler is healthy", zap.Any("health check info", *result))
if err := json.NewEncoder(w).Encode(result); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
} else {
log.Log(log.SchedHealth).Info("Health check is not available")
buildJSONErrorResponse(w, "Health check is not available", http.StatusNotFound)
}
}
func getPartitions(w http.ResponseWriter, _ *http.Request) {
writeHeaders(w)
lists := schedulerContext.GetPartitionMapClone()
partitionsInfo := getPartitionInfoDAO(lists)
if err := json.NewEncoder(w).Encode(partitionsInfo); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func getPartitionQueues(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
vars := httprouter.ParamsFromContext(r.Context())
if vars == nil {
buildJSONErrorResponse(w, MissingParamsName, http.StatusBadRequest)
return
}
partitionName := vars.ByName("partition")
var partitionQueuesDAOInfo dao.PartitionQueueDAOInfo
var partition = schedulerContext.GetPartitionWithoutClusterID(partitionName)
if partition != nil {
partitionQueuesDAOInfo = partition.GetPartitionQueues()
} else {
buildJSONErrorResponse(w, PartitionDoesNotExists, http.StatusNotFound)
return
}
if err := json.NewEncoder(w).Encode(partitionQueuesDAOInfo); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func getPartitionQueue(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
vars := httprouter.ParamsFromContext(r.Context())
if vars == nil {
buildJSONErrorResponse(w, MissingParamsName, http.StatusBadRequest)
return
}
partition := vars.ByName("partition")
partitionContext := schedulerContext.GetPartitionWithoutClusterID(partition)
if partitionContext == nil {
buildJSONErrorResponse(w, PartitionDoesNotExists, http.StatusNotFound)
return
}
queueName := vars.ByName("queue")
queueErr := validateQueue(queueName)
if queueErr != nil {
buildJSONErrorResponse(w, queueErr.Error(), http.StatusBadRequest)
return
}
queue := partitionContext.GetQueue(queueName)
if queue == nil {
buildJSONErrorResponse(w, QueueDoesNotExists, http.StatusNotFound)
return
}
queueDao := queue.GetPartitionQueueDAOInfo(r.URL.Query().Has("subtree"))
if err := json.NewEncoder(w).Encode(queueDao); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func getPartitionNodes(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
vars := httprouter.ParamsFromContext(r.Context())
if vars == nil {
buildJSONErrorResponse(w, MissingParamsName, http.StatusBadRequest)
return
}
partition := vars.ByName("partition")
partitionContext := schedulerContext.GetPartitionWithoutClusterID(partition)
if partitionContext != nil {
nodesDao := getNodesDAO(partitionContext.GetNodes())
if err := json.NewEncoder(w).Encode(nodesDao); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
} else {
buildJSONErrorResponse(w, PartitionDoesNotExists, http.StatusNotFound)
}
}
func getPartitionNode(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
vars := httprouter.ParamsFromContext(r.Context())
if vars == nil {
buildJSONErrorResponse(w, MissingParamsName, http.StatusBadRequest)
return
}
partition := vars.ByName("partition")
partitionContext := schedulerContext.GetPartitionWithoutClusterID(partition)
if partitionContext != nil {
nodeID := vars.ByName("node")
node := partitionContext.GetNode(nodeID)
if node == nil {
buildJSONErrorResponse(w, NodeDoesNotExists, http.StatusNotFound)
return
}
nodeDao := getNodeDAO(node)
if err := json.NewEncoder(w).Encode(nodeDao); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
} else {
buildJSONErrorResponse(w, PartitionDoesNotExists, http.StatusNotFound)
}
}
func getQueueApplications(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
vars := httprouter.ParamsFromContext(r.Context())
if vars == nil {
buildJSONErrorResponse(w, MissingParamsName, http.StatusBadRequest)
return
}
partition := vars.ByName("partition")
queueName := vars.ByName("queue")
queueErr := validateQueue(queueName)
if queueErr != nil {
buildJSONErrorResponse(w, queueErr.Error(), http.StatusBadRequest)
return
}
partitionContext := schedulerContext.GetPartitionWithoutClusterID(partition)
if partitionContext == nil {
buildJSONErrorResponse(w, PartitionDoesNotExists, http.StatusNotFound)
return
}
queue := partitionContext.GetQueue(queueName)
if queue == nil {
buildJSONErrorResponse(w, QueueDoesNotExists, http.StatusNotFound)
return
}
appsDao := make([]*dao.ApplicationDAOInfo, 0)
for _, app := range queue.GetCopyOfApps() {
appsDao = append(appsDao, getApplicationDAO(app))
}
if err := json.NewEncoder(w).Encode(appsDao); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func getPartitionApplicationsByState(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
vars := httprouter.ParamsFromContext(r.Context())
if vars == nil {
buildJSONErrorResponse(w, MissingParamsName, http.StatusBadRequest)
return
}
partition := vars.ByName("partition")
appState := strings.ToLower(vars.ByName("state"))
partitionContext := schedulerContext.GetPartitionWithoutClusterID(partition)
if partitionContext == nil {
buildJSONErrorResponse(w, PartitionDoesNotExists, http.StatusNotFound)
return
}
var appList []*objects.Application
switch appState {
case "active":
if status := strings.ToLower(r.URL.Query().Get("status")); status != "" {
if !allowedAppActiveStatuses[status] {
buildJSONErrorResponse(w, allowedActiveStatusMsg, http.StatusBadRequest)
return
}
for _, app := range partitionContext.GetApplications() {
if strings.ToLower(app.CurrentState()) == status {
appList = append(appList, app)
}
}
} else {
appList = partitionContext.GetApplications()
}
case "rejected":
appList = partitionContext.GetRejectedApplications()
case "completed":
appList = partitionContext.GetCompletedApplications()
default:
buildJSONErrorResponse(w, "Only following application states are allowed: active, rejected, completed", http.StatusBadRequest)
return
}
appsDao := make([]*dao.ApplicationDAOInfo, 0, len(appList))
for _, app := range appList {
appsDao = append(appsDao, getApplicationDAO(app))
}
if err := json.NewEncoder(w).Encode(appsDao); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func getApplication(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
vars := httprouter.ParamsFromContext(r.Context())
if vars == nil {
buildJSONErrorResponse(w, MissingParamsName, http.StatusBadRequest)
return
}
partition := vars.ByName("partition")
queueName := vars.ByName("queue")
application := vars.ByName("application")
partitionContext := schedulerContext.GetPartitionWithoutClusterID(partition)
if partitionContext == nil {
buildJSONErrorResponse(w, PartitionDoesNotExists, http.StatusNotFound)
return
}
var app *objects.Application
if len(queueName) == 0 {
app = partitionContext.GetApplication(application)
} else {
queueErr := validateQueue(queueName)
if queueErr != nil {
buildJSONErrorResponse(w, queueErr.Error(), http.StatusBadRequest)
return
}
queue := partitionContext.GetQueue(queueName)
if queue == nil {
buildJSONErrorResponse(w, QueueDoesNotExists, http.StatusNotFound)
return
}
app = queue.GetApplication(application)
}
if app == nil {
buildJSONErrorResponse(w, ApplicationDoesNotExists, http.StatusNotFound)
return
}
appDao := getApplicationDAO(app)
if err := json.NewEncoder(w).Encode(appDao); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func getPartitionInfoDAO(lists map[string]*scheduler.PartitionContext) []*dao.PartitionInfo {
result := make([]*dao.PartitionInfo, 0, len(lists))
for _, partitionContext := range lists {
partitionInfo := &dao.PartitionInfo{}
partitionInfo.ClusterID = partitionContext.RmID
partitionInfo.Name = common.GetPartitionNameWithoutClusterID(partitionContext.Name)
partitionInfo.State = partitionContext.GetCurrentState()
partitionInfo.LastStateTransitionTime = partitionContext.GetStateTime().UnixNano()
capacityInfo := dao.PartitionCapacity{}
capacity := partitionContext.GetTotalPartitionResource()
usedCapacity := partitionContext.GetAllocatedResource()
capacityInfo.Capacity = capacity.DAOMap()
capacityInfo.UsedCapacity = usedCapacity.DAOMap()
capacityInfo.Utilization = resources.CalculateAbsUsedCapacity(capacity, usedCapacity).DAOMap()
partitionInfo.Capacity = capacityInfo
partitionInfo.NodeSortingPolicy = dao.NodeSortingPolicy{
Type: partitionContext.GetNodeSortingPolicyType().String(),
ResourceWeights: partitionContext.GetNodeSortingResourceWeights(),
}
partitionInfo.TotalNodes = partitionContext.GetTotalNodeCount()
appList := partitionContext.GetApplications()
appList = append(appList, partitionContext.GetCompletedApplications()...)
appList = append(appList, partitionContext.GetRejectedApplications()...)
applicationsState := make(map[string]int)
totalApplications := 0
for _, app := range appList {
applicationsState[app.CurrentState()]++
totalApplications++
}
applicationsState["total"] = totalApplications
partitionInfo.Applications = applicationsState
partitionInfo.TotalContainers = partitionContext.GetTotalAllocationCount()
result = append(result, partitionInfo)
}
return result
}
func getAppHistoryDAO(records []*history.MetricsRecord) []*dao.ApplicationHistoryDAOInfo {
result := make([]*dao.ApplicationHistoryDAOInfo, 0)
for _, record := range records {
if record == nil {
continue
}
element := &dao.ApplicationHistoryDAOInfo{
Timestamp: record.Timestamp.UnixNano(),
TotalApplications: strconv.Itoa(record.TotalApplications),
}
result = append(result, element)
}
return result
}
func getPartitionNodesDAO(lists map[string]*scheduler.PartitionContext) []*dao.NodesDAOInfo {
result := make([]*dao.NodesDAOInfo, 0, len(lists))
for _, partition := range lists {
nodesDao := getNodesDAO(partition.GetNodes())
result = append(result, &dao.NodesDAOInfo{
PartitionName: common.GetPartitionNameWithoutClusterID(partition.Name),
Nodes: nodesDao,
})
}
return result
}
func getContainerHistoryDAO(records []*history.MetricsRecord) []*dao.ContainerHistoryDAOInfo {
result := make([]*dao.ContainerHistoryDAOInfo, 0)
for _, record := range records {
if record == nil {
continue
}
element := &dao.ContainerHistoryDAOInfo{
Timestamp: record.Timestamp.UnixNano(),
TotalContainers: strconv.Itoa(record.TotalContainers),
}
result = append(result, element)
}
return result
}
func getApplicationsDAO(lists map[string]*scheduler.PartitionContext) []*dao.ApplicationDAOInfo {
result := make([]*dao.ApplicationDAOInfo, 0, len(lists))
for _, partition := range lists {
var appList []*objects.Application
appList = append(appList, partition.GetApplications()...)
appList = append(appList, partition.GetCompletedApplications()...)
appList = append(appList, partition.GetRejectedApplications()...)
for _, app := range appList {
result = append(result, getApplicationDAO(app))
}
}
return result
}
func getPartitionQueuesDAO(lists map[string]*scheduler.PartitionContext) []dao.PartitionQueueDAOInfo {
result := make([]dao.PartitionQueueDAOInfo, 0, len(lists))
for _, partition := range lists {
result = append(result, partition.GetPartitionQueues())
}
return result
}
func getClusterDAO(lists map[string]*scheduler.PartitionContext) []*dao.ClusterDAOInfo {
result := make([]*dao.ClusterDAOInfo, 0, len(lists))
for _, partition := range lists {
result = append(result, getClusterJSON(partition))
}
return result
}
func getRMBuildInformation(lists map[string]*scheduler.RMInformation) []map[string]string {
result := make([]map[string]string, 0, len(lists))
for _, rmInfo := range lists {
result = append(result, rmInfo.RMBuildInformation)
}
return result
}
func getResourceManagerDiagnostics() map[string]interface{} {
result := make(map[string]interface{})
// if the RM has not registered state dump the plugin will be nil
plugin := plugins.GetStateDumpPlugin()
if plugin == nil {
result["empty"] = "Resource Manager did not register callback"
return result
}
// get state dump from RM
dumpStr, err := plugin.GetStateDump()
if err != nil {
// might be not implemented
log.Log(log.REST).Debug("Unable to get RM state dump", zap.Error(err))
result["Error"] = err.Error()
return result
}
// convert to JSON map
if err = json.Unmarshal([]byte(dumpStr), &result); err != nil {
log.Log(log.REST).Warn("Unable to parse RM state dump", zap.Error(err))
result["Error"] = err.Error()
}
return result
}
func getMetrics(w http.ResponseWriter, r *http.Request) {
metrics2.GetRuntimeMetrics().Collect()
promhttp.Handler().ServeHTTP(w, r)
}
func getUsersResourceUsage(w http.ResponseWriter, _ *http.Request) {
writeHeaders(w)
userManager := ugm.GetUserManager()
usersResources := userManager.GetUsersResources()
result := make([]*dao.UserResourceUsageDAOInfo, len(usersResources))
for i, tracker := range usersResources {
result[i] = tracker.GetUserResourceUsageDAOInfo()
}
if err := json.NewEncoder(w).Encode(result); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func getUserResourceUsage(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
vars := httprouter.ParamsFromContext(r.Context())
if vars == nil {
buildJSONErrorResponse(w, MissingParamsName, http.StatusBadRequest)
return
}
user := vars.ByName("user")
if user == "" {
buildJSONErrorResponse(w, UserNameMissing, http.StatusBadRequest)
return
}
userTracker := ugm.GetUserManager().GetUserTracker(user)
if userTracker == nil {
buildJSONErrorResponse(w, UserDoesNotExists, http.StatusNotFound)
return
}
var result = userTracker.GetUserResourceUsageDAOInfo()
if err := json.NewEncoder(w).Encode(result); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func getGroupsResourceUsage(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
userManager := ugm.GetUserManager()
groupsResources := userManager.GetGroupsResources()
result := make([]*dao.GroupResourceUsageDAOInfo, len(groupsResources))
for i, tracker := range groupsResources {
result[i] = tracker.GetGroupResourceUsageDAOInfo()
}
if err := json.NewEncoder(w).Encode(result); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func getGroupResourceUsage(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
vars := httprouter.ParamsFromContext(r.Context())
if vars == nil {
buildJSONErrorResponse(w, MissingParamsName, http.StatusBadRequest)
return
}
group := vars.ByName("group")
if group == "" {
buildJSONErrorResponse(w, GroupNameMissing, http.StatusBadRequest)
return
}
groupTracker := ugm.GetUserManager().GetGroupTracker(group)
if groupTracker == nil {
buildJSONErrorResponse(w, GroupDoesNotExists, http.StatusNotFound)
return
}
var result = groupTracker.GetGroupResourceUsageDAOInfo()
if err := json.NewEncoder(w).Encode(result); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func getEvents(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
eventSystem := events.GetEventSystem()
if !eventSystem.IsEventTrackingEnabled() {
buildJSONErrorResponse(w, "Event tracking is disabled", http.StatusInternalServerError)
return
}
count := uint64(10000)
var start uint64
if countStr := r.URL.Query().Get("count"); countStr != "" {
var err error
count, err = strconv.ParseUint(countStr, 10, 64)
if err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusBadRequest)
return
}
if count == 0 {
buildJSONErrorResponse(w, `0 is not a valid value for "count"`, http.StatusBadRequest)
return
}
}
if startStr := r.URL.Query().Get("start"); startStr != "" {
var err error
start, err = strconv.ParseUint(startStr, 10, 64)
if err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusBadRequest)
return
}
}
records, lowestID, highestID := eventSystem.GetEventsFromID(start, count)
eventDao := dao.EventRecordDAO{
InstanceUUID: schedulerContext.GetUUID(),
LowestID: lowestID,
HighestID: highestID,
EventRecords: records,
}
if err := json.NewEncoder(w).Encode(eventDao); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func getStream(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
eventSystem := events.GetEventSystem()
if !eventSystem.IsEventTrackingEnabled() {
buildJSONErrorResponse(w, "Event tracking is disabled", http.StatusInternalServerError)
return
}
f, ok := w.(http.Flusher)
if !ok {
buildJSONErrorResponse(w, "Writer does not implement http.Flusher", http.StatusInternalServerError)
return
}
if !streamingLimiter.AddHost(r.Host) {
buildJSONErrorResponse(w, "Too many streaming connections", http.StatusServiceUnavailable)
return
}
defer streamingLimiter.RemoveHost(r.Host)
var count uint64
if countStr := r.URL.Query().Get("count"); countStr != "" {
var err error
count, err = strconv.ParseUint(countStr, 10, 64)
if err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusBadRequest)
return
}
}
rc := http.NewResponseController(w)
// make sure both deadlines can be set
if err := rc.SetWriteDeadline(time.Time{}); err != nil {
log.Log(log.REST).Error("Cannot set write deadline", zap.Error(err))
buildJSONErrorResponse(w, fmt.Sprintf("Cannot set write deadline: %v", err), http.StatusInternalServerError)
return
}
if err := rc.SetReadDeadline(time.Time{}); err != nil {
log.Log(log.REST).Error("Cannot set read deadline", zap.Error(err))
buildJSONErrorResponse(w, fmt.Sprintf("Cannot set read deadline: %v", err), http.StatusInternalServerError)
return
}
enc := json.NewEncoder(w)
stream := eventSystem.CreateEventStream(r.Host, count)
defer eventSystem.RemoveStream(stream)
if err := enc.Encode(dao.YunikornID{
InstanceUUID: schedulerContext.GetUUID(),
}); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
return
}
f.Flush()
// Reading events in an infinite loop until either the client disconnects or Yunikorn closes the channel.
// This results in a persistent HTTP connection where the message body is never closed.
// Write deadline is adjusted before sending data to the client.
for {
select {
case <-r.Context().Done():
log.Log(log.REST).Info("Connection closed for event stream client",
zap.String("host", r.Host))
return
case e, ok := <-stream.Events:
err := rc.SetWriteDeadline(time.Now().Add(5 * time.Second))
if err != nil {
// should not fail at this point
log.Log(log.REST).Error("Cannot set write deadline", zap.Error(err))
buildJSONErrorResponse(w, fmt.Sprintf("Cannot set write deadline: %v", err), http.StatusOK) // status code is already 200 at this point
return
}
if !ok {
// the channel was closed by the event system itself
msg := "Event stream was closed by the producer"
buildJSONErrorResponse(w, msg, http.StatusOK) // status code is 200 at this point, cannot be changed
log.Log(log.REST).Error(msg)
return
}
if err := enc.Encode(e); err != nil {
log.Log(log.REST).Error("Marshalling error",
zap.String("host", r.Host))
buildJSONErrorResponse(w, err.Error(), http.StatusOK) // status code is 200 at this point, cannot be changed
return
}
f.Flush()
}
}
}