blob: c68ab3102fe823057fed934a1a286aa846e07aca [file] [log] [blame]
package cdn
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import (
"crypto/tls"
"database/sql"
"encoding/json"
"errors"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"github.com/apache/trafficcontrol/lib/go-log"
"github.com/apache/trafficcontrol/lib/go-tc"
"github.com/apache/trafficcontrol/traffic_ops/traffic_ops_golang/api"
"github.com/apache/trafficcontrol/traffic_ops/traffic_ops_golang/dbhelpers"
"github.com/apache/trafficcontrol/traffic_ops/traffic_ops_golang/util/monitorhlp"
)
func GetCapacity(w http.ResponseWriter, r *http.Request) {
inf, userErr, sysErr, errCode := api.NewInfo(r, nil, nil)
if userErr != nil || sysErr != nil {
api.HandleErr(w, r, inf.Tx.Tx, errCode, userErr, sysErr)
return
}
defer inf.Close()
api.RespWriter(w, r, inf.Tx.Tx)(getCapacity(inf.Tx.Tx))
}
const MonitorProxyParameter = "tm.traffic_mon_fwd_proxy"
const MonitorRequestTimeout = time.Second * 10
const MonitorOnlineStatus = "ONLINE"
func getCapacity(tx *sql.Tx) (CapacityResp, error) {
monitors, err := getCDNMonitorFQDNs(tx)
if err != nil {
return CapacityResp{}, errors.New("getting monitors: " + err.Error())
}
return getMonitorsCapacity(tx, monitors)
}
type CapacityResp struct {
AvailablePercent float64 `json:"availablePercent"`
UnavailablePercent float64 `json:"unavailablePercent"`
UtilizedPercent float64 `json:"utilizedPercent"`
MaintenancePercent float64 `json:"maintenancePercent"`
}
type CapData struct {
Available float64
Unavailable float64
Utilized float64
Maintenance float64
Capacity float64
}
func getMonitorsCapacity(tx *sql.Tx, monitors map[tc.CDNName][]string) (CapacityResp, error) {
monitorForwardProxy, monitorForwardProxyExists, err := dbhelpers.GetGlobalParam(tx, MonitorProxyParameter)
if err != nil {
return CapacityResp{}, errors.New("getting global monitor proxy parameter: " + err.Error())
}
client := &http.Client{Timeout: MonitorRequestTimeout}
if monitorForwardProxyExists {
proxyURI, err := url.Parse(monitorForwardProxy)
if err != nil {
return CapacityResp{}, errors.New("monitor forward proxy '" + monitorForwardProxy + "' in parameter '" + MonitorProxyParameter + "' not a URI: " + err.Error())
}
clientTransport := &http.Transport{Proxy: http.ProxyURL(proxyURI)}
if proxyURI.Scheme == "https" {
// TM does not support HTTP/2 and golang when connecting to https will use HTTP/2 by default causing a conflict
// The result will be an unsupported scheme error
// Setting TLSNextProto to any empty map will disable using HTTP/2 per https://golang.org/src/net/http/doc.go
clientTransport.TLSNextProto = make(map[string]func(authority string, c *tls.Conn) http.RoundTripper)
}
client = &http.Client{Timeout: MonitorRequestTimeout, Transport: clientTransport}
}
thresholds, err := getEdgeProfileHealthThresholdBandwidth(tx)
if err != nil {
return CapacityResp{}, errors.New("getting profile thresholds: " + err.Error())
}
cap, err := getCapacityData(monitors, thresholds, client)
if err != nil {
return CapacityResp{}, errors.New("getting capacity from monitors: " + err.Error())
} else if cap.Capacity == 0 {
return CapacityResp{}, errors.New("capacity was zero!") // avoid divide-by-zero below.
}
return CapacityResp{
UtilizedPercent: (cap.Available * 100) / cap.Capacity,
UnavailablePercent: (cap.Unavailable * 100) / cap.Capacity,
MaintenancePercent: (cap.Maintenance * 100) / cap.Capacity,
AvailablePercent: ((cap.Capacity - cap.Unavailable - cap.Maintenance - cap.Available) * 100) / cap.Capacity,
}, nil
}
// getCapacityData attempts to get the CDN capacity from each monitor. If one fails, it tries the next.
// The first monitor for which all data requests succeed is used.
// Only if all monitors for a CDN fail is an error returned, from the last monitor tried.
func getCapacityData(monitors map[tc.CDNName][]string, thresholds map[string]float64, client *http.Client) (CapData, error) {
cap := CapData{}
for cdn, monitorFQDNs := range monitors {
err := error(nil)
for _, monitorFQDN := range monitorFQDNs {
crStates := tc.CRStates{}
crConfig := tc.CRConfig{}
cacheStats := CacheStats{}
if crStates, err = monitorhlp.GetCRStates(monitorFQDN, client); err != nil {
err = errors.New("getting CRStates for CDN '" + string(cdn) + "' monitor '" + monitorFQDN + "': " + err.Error())
log.Warnln("getCapacity failed to get CRStates from cdn '" + string(cdn) + " monitor '" + monitorFQDN + "', trying next monitor: " + err.Error())
continue
}
if crConfig, err = monitorhlp.GetCRConfig(monitorFQDN, client); err != nil {
err = errors.New("getting CRConfig for CDN '" + string(cdn) + "' monitor '" + monitorFQDN + "': " + err.Error())
log.Warnln("getCapacity failed to get CRConfig from cdn '" + string(cdn) + " monitor '" + monitorFQDN + "', trying next monitor: " + err.Error())
continue
}
if err := getCacheStats(monitorFQDN, client, []string{"kbps", "maxKbps"}, &cacheStats); err != nil {
err = errors.New("getting cache stats for CDN '" + string(cdn) + "' monitor '" + monitorFQDN + "': " + err.Error())
log.Warnln("getCapacity failed to get CacheStats from cdn '" + string(cdn) + " monitor '" + monitorFQDN + "', trying next monitor: " + err.Error())
continue
}
cap = addCapacity(cap, cacheStats, crStates, crConfig, thresholds)
break
}
if err != nil {
return CapData{}, err
}
}
return cap, nil
}
func addCapacity(cap CapData, cacheStats CacheStats, crStates tc.CRStates, crConfig tc.CRConfig, thresholds map[string]float64) CapData {
for cacheName, stats := range cacheStats.Caches {
cache, ok := crConfig.ContentServers[string(cacheName)]
if !ok {
continue
}
if cache.ServerType == nil || cache.ServerStatus == nil || cache.Profile == nil {
log.Warnln("addCapacity got cache with nil values! Skipping!")
continue
}
if !strings.HasPrefix(*cache.ServerType, string(tc.CacheTypeEdge)) {
continue
}
if len(stats.KBPS) < 1 || len(stats.MaxKBPS) < 1 {
continue
}
if string(*cache.ServerStatus) == string(tc.CacheStatusReported) || string(*cache.ServerStatus) == string(tc.CacheStatusOnline) {
if crStates.Caches[cacheName].IsAvailable {
cap.Available += float64(stats.KBPS[0].Value)
} else {
cap.Unavailable += float64(stats.KBPS[0].Value)
}
} else if string(*cache.ServerStatus) == string(tc.CacheStatusAdminDown) {
cap.Maintenance += float64(stats.KBPS[0].Value)
} else {
continue // don't add capacity for OFFLINE or other statuses
}
cap.Capacity += float64(stats.MaxKBPS[0].Value) - thresholds[*cache.Profile]
}
return cap
}
func getEdgeProfileHealthThresholdBandwidth(tx *sql.Tx) (map[string]float64, error) {
rows, err := tx.Query(`
SELECT pr.name as profile, pa.name, pa.config_file, pa.value
FROM parameter as pa
JOIN profile_parameter as pp ON pp.parameter = pa.id
JOIN profile as pr ON pp.profile = pr.id
JOIN server as s ON s.profile = pr.id
JOIN cdn as c ON c.id = s.cdn_id
JOIN type as t ON s.type = t.id
WHERE t.name LIKE 'EDGE%'
AND pa.config_file = 'rascal-config.txt'
AND pa.name = 'health.threshold.availableBandwidthInKbps'
`)
if err != nil {
return nil, errors.New("querying thresholds: " + err.Error())
}
defer rows.Close()
profileThresholds := map[string]float64{}
for rows.Next() {
profile := ""
threshStr := ""
if err := rows.Scan(&profile, &threshStr); err != nil {
return nil, errors.New("scanning thresholds: " + err.Error())
}
threshStr = strings.TrimPrefix(threshStr, ">")
thresh, err := strconv.ParseFloat(threshStr, 64)
if err != nil {
return nil, errors.New("profile '" + profile + "' health.threshold.availableBandwidthInKbps is not a number")
}
profileThresholds[profile] = thresh
}
return profileThresholds, nil
}
// CacheStats contains the Monitor CacheStats needed by Cachedata. It is NOT the full object served by the Monitor, but only the data required by the caches stats endpoint.
type CacheStats struct {
Caches map[tc.CacheName]CacheStat `json:"caches"`
}
type CacheStat struct {
KBPS []CacheStatData `json:"kbps"`
MaxKBPS []CacheStatData `json:"maxKbps"`
}
type CacheStatData struct {
Value float64 `json:"value,string"`
}
// getCacheStats gets the cache stats from the given monitor. It takes stats, a slice of stat names; and cacheStats, an object to deserialize stats into. The cacheStats type must be of the form struct {caches map[tc.CacheName]struct{statName []struct{value float64}}} with the desired stats, with appropriate member names or tags.
func getCacheStats(monitorFQDN string, client *http.Client, stats []string, cacheStats interface{}) error {
path := `/publish/CacheStats`
if len(stats) > 0 {
path += `?stats=` + strings.Join(stats, `,`)
}
resp, err := client.Get("http://" + monitorFQDN + path)
if err != nil {
return errors.New("getting CacheStats from Monitor '" + monitorFQDN + "': " + err.Error())
}
defer resp.Body.Close()
if err := json.NewDecoder(resp.Body).Decode(cacheStats); err != nil {
return errors.New("decoding CacheStats from monitor '" + monitorFQDN + "': " + err.Error())
}
return nil
}
// getCDNMonitors returns an FQDN, including port, of an online monitor for each CDN. If a CDN has no online monitors, that CDN will not have an entry in the map. If a CDN has multiple online monitors, an arbitrary one will be returned.
func getCDNMonitorFQDNs(tx *sql.Tx) (map[tc.CDNName][]string, error) {
rows, err := tx.Query(`
SELECT s.host_name, s.domain_name, s.tcp_port, c.name as cdn
FROM server as s
JOIN type as t ON s.type = t.id
JOIN status as st ON st.id = s.status
JOIN cdn as c ON c.id = s.cdn_id
WHERE t.name = '` + tc.MonitorTypeName + `'
AND st.name = '` + MonitorOnlineStatus + `'
`)
if err != nil {
return nil, errors.New("querying monitors: " + err.Error())
}
defer rows.Close()
monitors := map[tc.CDNName][]string{}
for rows.Next() {
host := ""
domain := ""
port := sql.NullInt64{}
cdn := tc.CDNName("")
if err := rows.Scan(&host, &domain, &port, &cdn); err != nil {
return nil, errors.New("scanning monitors: " + err.Error())
}
fqdn := host + "." + domain
if port.Valid {
fqdn += ":" + strconv.FormatInt(port.Int64, 10)
}
monitors[cdn] = append(monitors[cdn], fqdn)
}
return monitors, nil
}