blob: 324bf59e243f8fd078693b44aa58b4704e5b3938 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package webservice
import (
"encoding/json"
"fmt"
"io"
"math"
"net/http"
"runtime"
"sort"
"strconv"
"strings"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
"gopkg.in/yaml.v2"
"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/log"
metrics2 "github.com/apache/yunikorn-core/pkg/metrics"
"github.com/apache/yunikorn-core/pkg/metrics/history"
"github.com/apache/yunikorn-core/pkg/plugins"
"github.com/apache/yunikorn-core/pkg/scheduler"
"github.com/apache/yunikorn-core/pkg/scheduler/objects"
"github.com/apache/yunikorn-core/pkg/webservice/dao"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
"github.com/gorilla/mux"
)
const PartitionDoesNotExists = "Partition not found"
const QueueDoesNotExists = "Queue not found"
func getStackInfo(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
var stack = func() []byte {
buf := make([]byte, 1024)
for {
n := runtime.Stack(buf, true)
if n < len(buf) {
return buf[:n]
}
buf = make([]byte, 2*len(buf))
}
}
if _, err := w.Write(stack()); err != nil {
log.Logger().Error("GetStackInfo error", zap.Error(err))
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func getClusterInfo(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
lists := schedulerContext.GetPartitionMapClone()
clustersInfo := getClusterDAO(lists)
if err := json.NewEncoder(w).Encode(clustersInfo); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func validateQueue(queuePath string) error {
if queuePath != "" {
queueNameArr := strings.Split(queuePath, ".")
for _, name := range queueNameArr {
if !configs.QueueNameRegExp.MatchString(name) {
return fmt.Errorf("problem in queue query parameter parsing as queue param "+
"%s contains invalid queue name %s. Queue name must only have "+
"alphanumeric characters, - or _, and be no longer than 64 characters", queuePath, name)
}
}
}
return nil
}
func validateConf(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
requestBytes, err := io.ReadAll(r.Body)
if err == nil {
_, err = configs.LoadSchedulerConfigFromByteArray(requestBytes)
}
var result dao.ValidateConfResponse
if err != nil {
result.Allowed = false
result.Reason = err.Error()
} else {
result.Allowed = true
}
if err = json.NewEncoder(w).Encode(result); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func writeHeaders(w http.ResponseWriter) {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Credentials", "true")
w.Header().Set("Access-Control-Allow-Methods", "GET,POST,HEAD,OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "X-Requested-With,Content-Type,Accept,Origin")
}
func buildJSONErrorResponse(w http.ResponseWriter, detail string, code int) {
w.WriteHeader(code)
errorInfo := dao.NewYAPIError(nil, code, detail)
if jsonErr := json.NewEncoder(w).Encode(errorInfo); jsonErr != nil {
log.Logger().Error(fmt.Sprintf("Problem in sending error response in JSON format. Error response: %s", detail))
}
}
func getClusterJSON(partition *scheduler.PartitionContext) *dao.ClusterDAOInfo {
clusterInfo := &dao.ClusterDAOInfo{}
clusterInfo.StartTime = schedulerContext.GetStartTime().UnixNano()
rmInfo := schedulerContext.GetRMInfoMapClone()
clusterInfo.RMBuildInformation = getRMBuildInformation(rmInfo)
clusterInfo.PartitionName = common.GetPartitionNameWithoutClusterID(partition.Name)
clusterInfo.TotalApplications = strconv.Itoa(partition.GetTotalApplicationCount())
clusterInfo.TotalContainers = strconv.Itoa(partition.GetTotalAllocationCount())
clusterInfo.TotalNodes = strconv.Itoa(partition.GetTotalNodeCount())
clusterInfo.ClusterName = "kubernetes"
clusterInfo.RunningApplications = strconv.Itoa(partition.GetTotalApplicationCount())
clusterInfo.RunningContainers = strconv.Itoa(partition.GetTotalAllocationCount())
clusterInfo.ActiveNodes = strconv.Itoa(partition.GetTotalNodeCount())
return clusterInfo
}
func getClusterUtilJSON(partition *scheduler.PartitionContext) []*dao.ClusterUtilDAOInfo {
var utils []*dao.ClusterUtilDAOInfo
var getResource bool = true
total := partition.GetTotalPartitionResource()
if resources.IsZero(total) {
getResource = false
}
used := partition.GetAllocatedResource()
if len(used.Resources) == 0 {
getResource = false
}
if getResource {
percent := resources.CalculateAbsUsedCapacity(total, used)
for name, value := range percent.Resources {
utilization := &dao.ClusterUtilDAOInfo{
ResourceType: name,
Total: int64(total.Resources[name]),
Used: int64(used.Resources[name]),
Usage: fmt.Sprintf("%d", int64(value)) + "%",
}
utils = append(utils, utilization)
}
} else if !getResource {
utilization := &dao.ClusterUtilDAOInfo{
ResourceType: "N/A",
Total: int64(-1),
Used: int64(-1),
Usage: "N/A",
}
utils = append(utils, utilization)
}
return utils
}
func getApplicationJSON(app *objects.Application) *dao.ApplicationDAOInfo {
allocations := app.GetAllAllocations()
allocationInfo := make([]dao.AllocationDAOInfo, 0, len(allocations))
placeholders := app.GetAllPlaceholderData()
placeholderInfo := make([]dao.PlaceholderDAOInfo, 0, len(placeholders))
var requestTime int64
for _, alloc := range allocations {
if alloc.IsPlaceholderUsed() {
requestTime = alloc.GetPlaceholderCreateTime().UnixNano()
} else {
requestTime = alloc.GetAsk().GetCreateTime().UnixNano()
}
allocTime := alloc.GetCreateTime().UnixNano()
allocInfo := dao.AllocationDAOInfo{
AllocationKey: alloc.GetAllocationKey(),
AllocationTags: alloc.GetTagsClone(),
RequestTime: requestTime,
AllocationTime: allocTime,
AllocationDelay: allocTime - requestTime,
UUID: alloc.GetUUID(),
ResourcePerAlloc: alloc.GetAllocatedResource().DAOMap(),
PlaceholderUsed: alloc.IsPlaceholderUsed(),
Priority: strconv.Itoa(int(alloc.GetPriority())),
NodeID: alloc.GetNodeID(),
ApplicationID: alloc.GetApplicationID(),
Partition: alloc.GetPartitionName(),
}
allocationInfo = append(allocationInfo, allocInfo)
}
stateLog := app.GetStateLog()
stateLogInfo := make([]dao.StateDAOInfo, 0, len(stateLog))
for _, state := range stateLog {
stateInfo := dao.StateDAOInfo{
Time: state.Time.UnixNano(),
ApplicationState: state.ApplicationState,
}
stateLogInfo = append(stateLogInfo, stateInfo)
}
for _, taskGroup := range placeholders {
phInfo := dao.PlaceholderDAOInfo{
TaskGroupName: taskGroup.TaskGroupName,
Count: taskGroup.Count,
MinResource: taskGroup.MinResource.DAOMap(),
Replaced: taskGroup.Replaced,
TimedOut: taskGroup.TimedOut,
}
placeholderInfo = append(placeholderInfo, phInfo)
}
return &dao.ApplicationDAOInfo{
ApplicationID: app.ApplicationID,
UsedResource: app.GetAllocatedResource().DAOMap(),
MaxUsedResource: app.GetMaxAllocatedResource().DAOMap(),
Partition: common.GetPartitionNameWithoutClusterID(app.Partition),
QueueName: app.GetQueuePath(),
SubmissionTime: app.SubmissionTime.UnixNano(),
FinishedTime: common.ZeroTimeInUnixNano(app.FinishedTime()),
Requests: getApplicationRequests(app),
Allocations: allocationInfo,
State: app.CurrentState(),
User: app.GetUser().User,
RejectedMessage: app.GetRejectedMessage(),
PlaceholderData: placeholderInfo,
StateLog: stateLogInfo,
HasReserved: app.HasReserved(),
Reservations: app.GetReservations(),
}
}
func getApplicationRequests(app *objects.Application) []dao.AllocationAskDAOInfo {
requests := app.GetAllRequests()
requestInfo := make([]dao.AllocationAskDAOInfo, 0)
for _, req := range requests {
count := req.GetPendingAskRepeat()
if count > 0 {
allocLog := req.GetAllocationLog()
sort.SliceStable(allocLog, func(i, j int) bool {
return allocLog[i].LastOccurrence.Before(allocLog[j].LastOccurrence)
})
allocLogInfo := make([]dao.AllocationAskLogDAOInfo, len(allocLog))
for i, log := range allocLog {
allocLogInfo[i] = dao.AllocationAskLogDAOInfo{
Message: log.Message,
LastOccurrence: log.LastOccurrence.UnixNano(),
Count: log.Count,
}
}
reqInfo := dao.AllocationAskDAOInfo{
AllocationKey: req.GetAllocationKey(),
AllocationTags: req.GetTagsClone(),
RequestTime: req.GetCreateTime().UnixNano(),
ResourcePerAlloc: req.GetAllocatedResource().DAOMap(),
PendingCount: count,
Priority: strconv.Itoa(int(req.GetPriority())),
RequiredNodeID: req.GetRequiredNode(),
ApplicationID: req.GetApplicationID(),
Partition: common.GetPartitionNameWithoutClusterID(req.GetPartitionName()),
Placeholder: req.IsPlaceholder(),
PlaceholderTimeout: req.GetTimeout().Nanoseconds(),
TaskGroupName: req.GetTaskGroup(),
AllocationLog: allocLogInfo,
}
requestInfo = append(requestInfo, reqInfo)
}
}
return requestInfo
}
func getNodeJSON(node *objects.Node) *dao.NodeDAOInfo {
apps := node.GetAllAllocations()
allocations := make([]*dao.AllocationDAOInfo, 0, len(apps))
var requestTime int64
for _, alloc := range apps {
if alloc.IsPlaceholderUsed() {
requestTime = alloc.GetPlaceholderCreateTime().UnixNano()
} else {
requestTime = alloc.GetAsk().GetCreateTime().UnixNano()
}
allocTime := alloc.GetCreateTime().UnixNano()
allocInfo := &dao.AllocationDAOInfo{
AllocationKey: alloc.GetAllocationKey(),
AllocationTags: alloc.GetTagsClone(),
RequestTime: requestTime,
AllocationTime: allocTime,
AllocationDelay: allocTime - requestTime,
UUID: alloc.GetUUID(),
ResourcePerAlloc: alloc.GetAllocatedResource().DAOMap(),
Priority: strconv.Itoa(int(alloc.GetPriority())),
NodeID: alloc.GetNodeID(),
ApplicationID: alloc.GetApplicationID(),
PlaceholderUsed: alloc.IsPlaceholderUsed(),
Partition: alloc.GetPartitionName(),
}
allocations = append(allocations, allocInfo)
}
return &dao.NodeDAOInfo{
NodeID: node.NodeID,
HostName: node.Hostname,
RackName: node.Rackname,
Capacity: node.GetCapacity().DAOMap(),
Occupied: node.GetOccupiedResource().DAOMap(),
Allocated: node.GetAllocatedResource().DAOMap(),
Available: node.GetAvailableResource().DAOMap(),
Utilized: node.GetUtilizedResource().DAOMap(),
Allocations: allocations,
Schedulable: node.IsSchedulable(),
IsReserved: node.IsReserved(),
Reservations: node.GetReservations(),
}
}
func getNodesUtilJSON(partition *scheduler.PartitionContext, name string) *dao.NodesUtilDAOInfo {
mapResult := make([]int, 10)
mapName := make([][]string, 10)
var v float64
var nodeUtil []*dao.NodeUtilDAOInfo
for _, node := range partition.GetNodes() {
resourceExist := true
// check resource exist or not
total := node.GetCapacity()
if total.Resources[name] <= 0 {
resourceExist = false
}
resourceAllocated := node.GetAllocatedResource()
if _, ok := resourceAllocated.Resources[name]; !ok {
resourceExist = false
}
// if resource exist in node, record the bucket it should go
if resourceExist {
v = float64(resources.CalculateAbsUsedCapacity(total, resourceAllocated).Resources[name])
idx := int(math.Dim(math.Ceil(v/10), 1))
mapResult[idx]++
mapName[idx] = append(mapName[idx], node.NodeID)
}
}
// put number of nodes and node name to different buckets
for k := 0; k < 10; k++ {
util := &dao.NodeUtilDAOInfo{
BucketName: fmt.Sprintf("%d", k*10) + "-" + fmt.Sprintf("%d", (k+1)*10) + "%",
NumOfNodes: int64(mapResult[k]),
NodeNames: mapName[k],
}
nodeUtil = append(nodeUtil, util)
}
return &dao.NodesUtilDAOInfo{
ResourceType: name,
NodesUtil: nodeUtil,
}
}
func getApplicationHistory(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
// There is nothing to return but we did not really encounter a problem
if imHistory == nil {
buildJSONErrorResponse(w, "Internal metrics collection is not enabled.", http.StatusNotImplemented)
return
}
// get a copy of the records: if the array contains nil values they will always be at the
// start and we cannot shortcut the loop using a break, we must finish iterating
records := imHistory.GetRecords()
result := getAppHistoryDAO(records)
if err := json.NewEncoder(w).Encode(result); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func getContainerHistory(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
// There is nothing to return but we did not really encounter a problem
if imHistory == nil {
buildJSONErrorResponse(w, "Internal metrics collection is not enabled.", http.StatusNotImplemented)
return
}
// get a copy of the records: if the array contains nil values they will always be at the
// start and we cannot shortcut the loop using a break, we must finish iterating
records := imHistory.GetRecords()
result := getContainerHistoryDAO(records)
if err := json.NewEncoder(w).Encode(result); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func getClusterConfig(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
conf := configs.ConfigContext.Get(schedulerContext.GetPolicyGroup())
var marshalledConf []byte
var err error
// check if we have a request for json output
if r.Header.Get("Accept") == "application/json" {
marshalledConf, err = json.Marshal(&conf)
} else {
w.Header().Set("Content-Type", "application/x-yaml; charset=UTF-8")
marshalledConf, err = yaml.Marshal(&conf)
}
if err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
if _, err = w.Write(marshalledConf); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func createClusterConfig(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
queryParams := r.URL.Query()
dryRun, dryRunExists := queryParams["dry_run"]
if !dryRunExists {
buildJSONErrorResponse(w, "Dry run param is missing. Please check the usage documentation", http.StatusBadRequest)
return
}
if dryRun[0] != "1" {
buildJSONErrorResponse(w, "Invalid \"dry_run\" query param. Currently, only dry_run=1 is supported. Please check the usage documentation", http.StatusBadRequest)
return
}
requestBytes, err := io.ReadAll(r.Body)
if err == nil {
_, err = configs.LoadSchedulerConfigFromByteArray(requestBytes)
}
var result dao.ValidateConfResponse
if err != nil {
result.Allowed = false
result.Reason = err.Error()
} else {
result.Allowed = true
}
if err = json.NewEncoder(w).Encode(result); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func updateClusterConfig(w http.ResponseWriter, r *http.Request) {
lock.Lock()
defer lock.Unlock()
writeHeaders(w)
requestBytes, err := io.ReadAll(r.Body)
if err != nil {
buildUpdateResponse(err, w)
return
}
newConf, err := configs.ParseAndValidateConfig(requestBytes)
if err != nil {
buildUpdateResponse(err, w)
return
}
if !isChecksumEqual(newConf.Checksum) {
buildUpdateResponse(fmt.Errorf("the base configuration is changed"), w)
return
}
configs.SetChecksum(requestBytes, newConf)
newConfStr := configs.GetConfigurationString(requestBytes)
// This fails if we have more than 1 RM
// Do not think the plugins will even work with multiple RMs
var oldConf string
oldConf, err = updateConfiguration(newConfStr)
if err != nil {
buildUpdateResponse(err, w)
return
}
// This fails if we have no RM registered or more than 1 RM
err = schedulerContext.UpdateSchedulerConfig(newConf)
if err != nil {
// revert configmap changes
_, err2 := updateConfiguration(oldConf)
if err2 != nil {
err = fmt.Errorf("update failed: %s\nupdate rollback failed: %s", err.Error(), err2.Error())
}
buildUpdateResponse(err, w)
return
}
buildUpdateResponse(nil, w)
}
func isChecksumEqual(checksum string) bool {
return configs.ConfigContext.Get(schedulerContext.GetPolicyGroup()).Checksum == checksum
}
func checkHealthStatus(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
// Fetch last healthCheck result
result := schedulerContext.GetLastHealthCheckResult()
if result != nil {
if !result.Healthy {
log.Logger().Error("Scheduler is not healthy", zap.Any("health check info", *result))
buildJSONErrorResponse(w, "Scheduler is not healthy", http.StatusServiceUnavailable)
} else {
log.Logger().Info("Scheduler is healthy", zap.Any("health check info", *result))
if err := json.NewEncoder(w).Encode(result); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
} else {
log.Logger().Info("The healthy status of scheduler is not found", zap.Any("health check info", ""))
buildJSONErrorResponse(w, "The healthy status of scheduler is not found", http.StatusNotFound)
}
}
func buildUpdateResponse(err error, w http.ResponseWriter) {
if err == nil {
w.WriteHeader(http.StatusOK)
if _, err = w.Write([]byte("Configuration updated successfully")); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
} else {
log.Logger().Info("Configuration update failed with errors",
zap.Error(err))
buildJSONErrorResponse(w, err.Error(), http.StatusConflict)
}
}
func updateConfiguration(conf string) (string, error) {
if plugin := plugins.GetResourceManagerCallbackPlugin(); plugin != nil {
// use the plugin to update the configuration in the configMap
resp := plugin.UpdateConfiguration(&si.UpdateConfigurationRequest{
Configs: conf,
})
if resp.Success {
return resp.OldConfig, nil
}
return resp.OldConfig, fmt.Errorf(resp.Reason)
}
return "", fmt.Errorf("config plugin not found")
}
func getPartitions(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
lists := schedulerContext.GetPartitionMapClone()
partitionsInfo := getPartitionInfoDAO(lists)
if err := json.NewEncoder(w).Encode(partitionsInfo); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func getPartitionQueues(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
writeHeaders(w)
partitionName := vars["partition"]
var partitionQueuesDAOInfo dao.PartitionQueueDAOInfo
var partition = schedulerContext.GetPartitionWithoutClusterID(partitionName)
if partition != nil {
partitionQueuesDAOInfo = partition.GetPartitionQueues()
} else {
buildJSONErrorResponse(w, PartitionDoesNotExists, http.StatusBadRequest)
return
}
if err := json.NewEncoder(w).Encode(partitionQueuesDAOInfo); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func getPartitionNodes(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
writeHeaders(w)
partition := vars["partition"]
partitionContext := schedulerContext.GetPartitionWithoutClusterID(partition)
if partitionContext != nil {
ns := partitionContext.GetNodes()
nodesDao := make([]*dao.NodeDAOInfo, 0, len(ns))
for _, node := range ns {
nodeDao := getNodeJSON(node)
nodesDao = append(nodesDao, nodeDao)
}
if err := json.NewEncoder(w).Encode(nodesDao); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
} else {
buildJSONErrorResponse(w, PartitionDoesNotExists, http.StatusBadRequest)
}
}
func getQueueApplications(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
writeHeaders(w)
partition := vars["partition"]
queueName := vars["queue"]
queueErr := validateQueue(queueName)
if queueErr != nil {
buildJSONErrorResponse(w, queueErr.Error(), http.StatusBadRequest)
return
}
partitionContext := schedulerContext.GetPartitionWithoutClusterID(partition)
if partitionContext == nil {
buildJSONErrorResponse(w, PartitionDoesNotExists, http.StatusBadRequest)
return
}
queue := partitionContext.GetQueue(queueName)
if queue == nil {
buildJSONErrorResponse(w, QueueDoesNotExists, http.StatusBadRequest)
return
}
apps := queue.GetCopyOfApps()
completedApps := queue.GetCopyOfCompletedApps()
appsDao := make([]*dao.ApplicationDAOInfo, 0, len(apps)+len(completedApps))
for _, app := range apps {
appsDao = append(appsDao, getApplicationJSON(app))
}
for _, app := range completedApps {
appsDao = append(appsDao, getApplicationJSON(app))
}
if err := json.NewEncoder(w).Encode(appsDao); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func getApplication(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
writeHeaders(w)
partition := vars["partition"]
queueName := vars["queue"]
application := vars["application"]
queueErr := validateQueue(queueName)
if queueErr != nil {
buildJSONErrorResponse(w, queueErr.Error(), http.StatusBadRequest)
return
}
partitionContext := schedulerContext.GetPartitionWithoutClusterID(partition)
if partitionContext == nil {
buildJSONErrorResponse(w, PartitionDoesNotExists, http.StatusBadRequest)
return
}
queue := partitionContext.GetQueue(queueName)
if queue == nil {
buildJSONErrorResponse(w, QueueDoesNotExists, http.StatusBadRequest)
return
}
app := queue.GetApplication(application)
if app == nil {
buildJSONErrorResponse(w, "Application not found", http.StatusBadRequest)
return
}
appDao := getApplicationJSON(app)
if err := json.NewEncoder(w).Encode(appDao); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func setLogLevel(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
writeHeaders(w)
level := vars["level"]
if err := log.SetLogLevel(level); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusBadRequest)
}
}
func getLogLevel(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
zapConfig := log.GetConfig()
if _, err := w.Write([]byte(zapConfig.Level.Level().String())); err != nil {
log.Logger().Error("Could not get log level", zap.Error(err))
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
func getPartitionInfoDAO(lists map[string]*scheduler.PartitionContext) []*dao.PartitionInfo {
var result []*dao.PartitionInfo
for _, partitionContext := range lists {
partitionInfo := &dao.PartitionInfo{}
partitionInfo.ClusterID = partitionContext.RmID
partitionInfo.Name = common.GetPartitionNameWithoutClusterID(partitionContext.Name)
partitionInfo.State = partitionContext.GetCurrentState()
partitionInfo.LastStateTransitionTime = partitionContext.GetStateTime().UnixNano()
capacityInfo := dao.PartitionCapacity{}
capacity := partitionContext.GetTotalPartitionResource()
usedCapacity := partitionContext.GetAllocatedResource()
capacityInfo.Capacity = capacity.DAOMap()
capacityInfo.UsedCapacity = usedCapacity.DAOMap()
capacityInfo.Utilization = resources.CalculateAbsUsedCapacity(capacity, usedCapacity).DAOMap()
partitionInfo.Capacity = capacityInfo
partitionInfo.NodeSortingPolicy = dao.NodeSortingPolicy{
Type: partitionContext.GetNodeSortingPolicyType().String(),
ResourceWeights: partitionContext.GetNodeSortingResourceWeights(),
}
appList := partitionContext.GetApplications()
appList = append(appList, partitionContext.GetCompletedApplications()...)
applicationsState := make(map[string]int)
totalApplications := 0
for _, app := range appList {
applicationsState[app.CurrentState()]++
totalApplications++
}
applicationsState["total"] = totalApplications
partitionInfo.Applications = applicationsState
result = append(result, partitionInfo)
}
return result
}
func getAppHistoryDAO(records []*history.MetricsRecord) []*dao.ApplicationHistoryDAOInfo {
var result []*dao.ApplicationHistoryDAOInfo
for _, record := range records {
if record == nil {
continue
}
element := &dao.ApplicationHistoryDAOInfo{
Timestamp: record.Timestamp.UnixNano(),
TotalApplications: strconv.Itoa(record.TotalApplications),
}
result = append(result, element)
}
return result
}
func getNodesDAO(lists map[string]*scheduler.PartitionContext) []*dao.NodesDAOInfo {
var result []*dao.NodesDAOInfo
for _, partition := range lists {
ns := partition.GetNodes()
nodesDao := make([]*dao.NodeDAOInfo, 0, len(ns))
for _, node := range ns {
nodeDao := getNodeJSON(node)
nodesDao = append(nodesDao, nodeDao)
}
result = append(result, &dao.NodesDAOInfo{
PartitionName: common.GetPartitionNameWithoutClusterID(partition.Name),
Nodes: nodesDao,
})
}
return result
}
func getContainerHistoryDAO(records []*history.MetricsRecord) []*dao.ContainerHistoryDAOInfo {
var result []*dao.ContainerHistoryDAOInfo
for _, record := range records {
if record == nil {
continue
}
element := &dao.ContainerHistoryDAOInfo{
Timestamp: record.Timestamp.UnixNano(),
TotalContainers: strconv.Itoa(record.TotalContainers),
}
result = append(result, element)
}
return result
}
func getApplicationsDAO(lists map[string]*scheduler.PartitionContext) []*dao.ApplicationDAOInfo {
var result []*dao.ApplicationDAOInfo
for _, partition := range lists {
var appList []*objects.Application
appList = append(appList, partition.GetApplications()...)
appList = append(appList, partition.GetCompletedApplications()...)
appList = append(appList, partition.GetRejectedApplications()...)
for _, app := range appList {
result = append(result, getApplicationJSON(app))
}
}
return result
}
func getPartitionQueuesDAO(lists map[string]*scheduler.PartitionContext) []dao.PartitionQueueDAOInfo {
var result []dao.PartitionQueueDAOInfo
for _, partition := range lists {
result = append(result, partition.GetPartitionQueues())
}
return result
}
func getClusterDAO(lists map[string]*scheduler.PartitionContext) []*dao.ClusterDAOInfo {
var result []*dao.ClusterDAOInfo
for _, partition := range lists {
result = append(result, getClusterJSON(partition))
}
return result
}
func getRMBuildInformation(lists map[string]*scheduler.RMInformation) []map[string]string {
var result []map[string]string
for _, rmInfo := range lists {
result = append(result, rmInfo.RMBuildInformation)
}
return result
}
func getMetrics(w http.ResponseWriter, r *http.Request) {
metrics2.GetRuntimeMetrics().Collect()
promhttp.Handler().ServeHTTP(w, r)
}