blob: 7926d42f6291a5df93536ad71c4ffb57d0c54368 [file] [log] [blame]
package cache
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import (
"bufio"
"errors"
"fmt"
"io"
"math"
"strconv"
"strings"
"github.com/apache/trafficcontrol/lib/go-log"
"github.com/apache/trafficcontrol/traffic_monitor/poller"
"github.com/apache/trafficcontrol/traffic_monitor/todata"
jsoniter "github.com/json-iterator/go"
)
// LOADAVG_SHIFT is the amount by which "loadavg" values returned by
// stats_over_http need to be divided to obtain the values with which ATC
// operators are more familiar.
//
// The reason for this is that the Linux kernel stores loadavg values as
// integral types internally, and performs conversions to floating-point
// numbers on-the-fly when the contents of /proc/loadavg are read. Since
// astats_over_http used to always just read that file to get the numbers,
// everyone's used to the floating-point form. But stats_over_http gets
// the numbers directly from a syscall, so they aren't pre-converted for us.
//
// Dividing by this number is kind of a shortcut, for the actual transformation
// used by the kernel itself, refer to the source:
// https://github.com/torvalds/linux/blob/master/fs/proc/loadavg.c
const LOADAVG_SHIFT = 65536
func init() {
// AddStatsType("stats_over_http", statsParse, statsPrecompute)
registerDecoder("stats_over_http", statsOverHTTPParse, statsOverHTTPPrecompute)
}
type stats_over_httpData struct {
Global map[string]interface{} `json:"global"`
}
func statsOverHTTPParse(cacheName string, data io.Reader, pollCTX interface{}) (Statistics, map[string]interface{}, error) {
var stats Statistics
if data == nil {
log.Warnf("Cannot read stats data for cache '%s' - nil data reader", cacheName)
return stats, nil, errors.New("handler got nil reader")
}
var sohData stats_over_httpData
var err error
ctx := pollCTX.(*poller.HTTPPollCtx)
ctype := ctx.HTTPHeader.Get("Content-Type")
if ctype == "text/json" || ctype == "text/javascript" || ctype == "application/json" || ctype == "" {
json := jsoniter.ConfigFastest
err := json.NewDecoder(data).Decode(&sohData)
if err != nil {
return stats, nil, err
}
} else if ctype == "text/csv" {
sohData.Global, err = statsOverHTTPParseCSV(cacheName, data)
if err != nil {
return stats, nil, err
}
} else {
return stats, nil, fmt.Errorf("stats Content-Type (%s) can not be parsed by statsOverHTTP", ctype)
}
if len(sohData.Global) < 1 {
return stats, nil, errors.New("No 'global' data object found in stats_over_http payload")
}
statMap := sohData.Global
if stats.Loadavg, err = parseLoadAvg(statMap); err != nil {
return stats, nil, fmt.Errorf("Error parsing loadavg for cache '%s': %v", cacheName, err)
}
stats.Interfaces = parseInterfaces(statMap)
if len(stats.Interfaces) < 1 {
return stats, nil, fmt.Errorf("cache '%s' had no interfaces", cacheName)
}
return stats, statMap, nil
}
func statsOverHTTPParseCSV(cacheName string, data io.Reader) (map[string]interface{}, error) {
if data == nil {
log.Warnf("Cannot read stats data for cache '%s' - nil data reader", cacheName)
return nil, errors.New("handler got nil reader")
}
var allData []string
scanner := bufio.NewScanner(data)
for scanner.Scan() {
allData = append(allData, scanner.Text())
}
globalData := make(map[string]interface{}, len(allData))
for _, line := range allData {
delim := strings.IndexByte(line, ',')
// No delimiter found, skip this line as invalid
if delim < 0 {
continue
}
value, err := strconv.ParseFloat(line[delim+1:], 64)
// Skip values that dont parse
if err != nil {
continue
}
globalData[line[0:delim]] = value
}
if len(globalData) < 1 {
return nil, errors.New("no valid data found in stats_over_http payload with csv format")
}
return globalData, nil
}
func parseLoadAvg(stats map[string]interface{}) (Loadavg, error) {
var load Loadavg
if stat, ok := stats["plugin.system_stats.loadavg.one"]; !ok {
return load, errors.New("Data was missing 'plugin.system_stats.loadavg.one'")
} else {
switch t := stat.(type) {
case float64:
load.One = stat.(float64) / LOADAVG_SHIFT
case string:
if statVal, err := strconv.ParseFloat(stat.(string), 64); err != nil {
return load, fmt.Errorf("loadavg.one could not parse to float, was '%v' (%v)", stat, err)
} else {
load.One = statVal / LOADAVG_SHIFT
}
default:
return load, fmt.Errorf("loadavg.one had unrecognized type '%T'", t)
}
}
delete(stats, "plugin.system_stats.loadavg.one")
if stat, ok := stats["plugin.system_stats.loadavg.five"]; !ok {
return load, errors.New("Data was missing 'plugin.system_stats.loadavg.five'")
} else {
switch t := stat.(type) {
case float64:
load.Five = stat.(float64) / LOADAVG_SHIFT
case string:
if statVal, err := strconv.ParseFloat(stat.(string), 64); err != nil {
return load, fmt.Errorf("loadavg.five could not parse to float, was '%v' (%v)", stat, err)
} else {
load.Five = statVal / LOADAVG_SHIFT
}
default:
return load, fmt.Errorf("loadavg.five had unrecognized type '%T'", t)
}
}
delete(stats, "plugin.system_stats.loadavg.five")
if stat, ok := stats["plugin.system_stats.loadavg.fifteen"]; !ok {
return load, errors.New("Data was missing 'plugin.system_stats.loadavg.fifteen'")
} else {
switch t := stat.(type) {
case float64:
load.Fifteen = stat.(float64) / LOADAVG_SHIFT
case string:
if statVal, err := strconv.ParseFloat(stat.(string), 64); err != nil {
return load, fmt.Errorf("loadavg.fifteen could not parse to float, was '%v' (%v)", stat, err)
} else {
load.Fifteen = statVal / LOADAVG_SHIFT
}
default:
return load, fmt.Errorf("loadavg.fifteen had unrecognized type '%T'", t)
}
}
delete(stats, "plugin.system_stats.loadavg.fifteen")
if stat, ok := stats["plugin.system_stats.current_processes"]; !ok {
return load, errors.New("Data was missing 'plugin.system_stats.current_processes'")
} else {
switch t := stat.(type) {
case float64:
if stat.(float64) > math.MaxUint64 {
return load, fmt.Errorf("current number of processes cannot be represented as a uint64 - too big (%v)", stat)
} else if stat.(float64) < 0 {
return load, fmt.Errorf("current_processes cannot be negative, got %v", stat)
}
load.TotalProcesses = uint64(stat.(float64))
case string:
if statVal, err := strconv.ParseUint(stat.(string), 10, 64); err != nil {
return load, fmt.Errorf("current_processes could not parse to uint64, was '%v' (%v)", stat, err)
} else {
load.TotalProcesses = statVal
}
default:
return load, fmt.Errorf("current_processes had unrecognized type '%T'", t)
}
}
delete(stats, "plugin.system_stats.current_processes")
return load, nil
}
func parseInterfaces(stats map[string]interface{}) map[string]Interface {
ifaces := make(map[string]Interface)
for stat, value := range stats {
// The form of the output isn't fully documented, so when something isn't
// of the right form you don't KNOW something went wrong; it could just be
// that you're not looking at what you think you're looking at. So when that
// happens we issue a warning and continue.
if strings.HasPrefix(stat, "plugin.system_stats.net.") {
statParts := strings.SplitN(strings.TrimPrefix(stat, "plugin.system_stats.net."), ".", 2)
if len(statParts) != 2 {
log.Warnf("stat '%s' appears to be network related, but is not an interface", stat)
continue
}
switch statParts[1] {
case "rx_bytes":
var rxBytes uint64
switch t := value.(type) {
case float64:
if value.(float64) > math.MaxUint64 {
log.Warnf("received bytes for interface '%s' cannot be represented as a uint64 - too big (%v)", statParts[0], value)
continue
} else if value.(float64) < 0 {
log.Warnf("received bytes for interface '%s' was negative (%v)", statParts[0], value)
continue
}
rxBytes = uint64(value.(float64))
case string:
if statVal, err := strconv.ParseUint(value.(string), 10, 64); err != nil {
log.Warnf("received bytes for interface '%s' cannot parse as uint64, was '%v' (%v)", statParts[0], value, err)
continue
} else {
rxBytes = statVal
}
default:
log.Warnf("received bytes for interface '%s' had unrecognized type '%T'", statParts[0], t)
continue
}
tmp := ifaces[statParts[0]]
tmp.BytesIn = rxBytes
ifaces[statParts[0]] = tmp
case "tx_bytes":
var txBytes uint64
switch t := value.(type) {
case float64:
if value.(float64) > math.MaxUint64 {
log.Warnf("transmitted bytes for interface '%s' cannot be represented as a uint64 - too big (%v)", statParts[0], value)
continue
} else if value.(float64) < 0 {
log.Warnf("transmitted bytes for interface '%s' was negative (%v)", statParts[0], value)
continue
}
txBytes = uint64(value.(float64))
case string:
if statVal, err := strconv.ParseUint(value.(string), 10, 64); err != nil {
log.Warnf("transmitted bytes for interface '%s' cannot parse as uint64, was '%v' (%v)", statParts[0], value, err)
continue
} else {
txBytes = statVal
}
default:
log.Warnf("transmitted bytes for interface '%s' had unrecognized type '%T'", statParts[0], t)
continue
}
tmp := ifaces[statParts[0]]
tmp.BytesOut = txBytes
ifaces[statParts[0]] = tmp
case "speed":
var speed int64
switch t := value.(type) {
case float64:
if value.(float64) > math.MaxInt64 || value.(float64) < math.MinInt64 {
log.Warnf("speed of interface '%s' outside of representable integer range: %v", statParts[0], value)
continue
}
speed = int64(value.(float64))
case string:
if statVal, err := strconv.ParseInt(value.(string), 10, 64); err != nil {
log.Warnf("speed of interface '%s' cannot parse to int64, was '%v': %v", statParts[0], value, err)
continue
} else {
speed = statVal
}
default:
log.Warnf("speed for interface '%s' had unrecognized type '%T'", statParts[0], t)
}
tmp := ifaces[statParts[0]]
tmp.Speed = speed
ifaces[statParts[0]] = tmp
}
}
}
return ifaces
}
func parseNumericStat(value interface{}) (uint64, error) {
switch t := value.(type) {
case uint:
return uint64(value.(uint)), nil
case uint32:
return uint64(value.(uint32)), nil
case uint64:
return value.(uint64), nil
case int:
if value.(int) < 0 {
return 0, errors.New("value was negative")
}
return uint64(value.(int)), nil
case int32:
if value.(int32) < 0 {
return 0, errors.New("value was negative")
}
return uint64(value.(int32)), nil
case int64:
if value.(int64) < 0 {
return 0, errors.New("value was negative")
}
return value.(uint64), nil
case float64:
if value.(float64) > math.MaxUint64 || value.(float64) < 0 {
return 0, errors.New("value out of range for uint64")
}
return uint64(value.(float64)), nil
case float32:
if value.(float32) > math.MaxUint64 || value.(float32) < 0 {
return 0, errors.New("value out of range for uint64")
}
return uint64(value.(float64)), nil
case string:
if statVal, err := strconv.ParseUint(value.(string), 10, 64); err != nil {
return 0, fmt.Errorf("could not parse '%v' to uint64: %v", value, err)
} else {
return statVal, nil
}
default:
return 0, fmt.Errorf("value '%v' is of unrecognized type %T", value, t)
}
}
func statsOverHTTPPrecompute(cacheName string, data todata.TOData, stats Statistics, miscStats map[string]interface{}) PrecomputedData {
var precomputed PrecomputedData
precomputed.DeliveryServiceStats = make(map[string]*DSStat)
precomputed.OutBytes = 0
precomputed.MaxKbps = 0
for _, iface := range stats.Interfaces {
precomputed.OutBytes += iface.BytesOut
if iface.Speed > precomputed.MaxKbps {
precomputed.MaxKbps = iface.Speed
}
}
precomputed.MaxKbps *= 1000
for stat, value := range miscStats {
if strings.HasPrefix(stat, "plugin.remap_stats.") {
trimmedStat := strings.TrimPrefix(stat, "plugin.remap_stats.")
statParts := strings.Split(trimmedStat, ".")
if len(statParts) < 3 {
err := errors.New("stat has no remap_stats deliveryservice and name parts")
log.Infof("precomputing cache %s stat %s value %v error %v", cacheName, stat, value, err)
precomputed.Errors = append(precomputed.Errors, err)
continue
}
subsubdomain := statParts[0]
subdomain := statParts[1]
domain := strings.Join(statParts[2:len(statParts)-1], ".")
ds, ok := data.DeliveryServiceRegexes.DeliveryService(domain, subdomain, subsubdomain)
if !ok {
err := errors.New("No Delivery Service match for stat")
log.Infof("precomputing cache %s stat %s value %v error %v", cacheName, stat, value, err)
precomputed.Errors = append(precomputed.Errors, err)
continue
}
if ds == "" {
err := errors.New("Empty Delivery Service FQDN")
log.Infof("precomputing cache %s stat %s value %v error %v", cacheName, stat, value, err)
precomputed.Errors = append(precomputed.Errors, err)
continue
}
dsName := string(ds)
dsStat, ok := precomputed.DeliveryServiceStats[dsName]
if !ok || dsStat == nil {
dsStat = new(DSStat)
}
parsedStat, err := parseNumericStat(value)
if err != nil {
err = fmt.Errorf("couldn't parse numeric stat: %v", err)
log.Infof("precomputing cache %s stat %s value %v error %v", cacheName, stat, value, err)
precomputed.Errors = append(precomputed.Errors, err)
continue
}
switch statParts[len(statParts)-1] {
case "status_2xx":
dsStat.Status2xx += parsedStat
case "status_3xx":
dsStat.Status3xx += parsedStat
case "status_4xx":
dsStat.Status4xx += parsedStat
case "status_5xx":
dsStat.Status5xx += parsedStat
case "out_bytes":
dsStat.OutBytes += parsedStat
case "in_bytes":
dsStat.InBytes += parsedStat
default:
err = fmt.Errorf("Unknown stat '%s'", statParts[len(statParts)-1])
log.Infof("precomputing cache %s stat %s value %v error %v", cacheName, stat, value, err)
precomputed.Errors = append(precomputed.Errors, err)
continue
}
precomputed.DeliveryServiceStats[dsName] = dsStat
}
}
return precomputed
}