blob: f0f713bbc152bf489543f44eda50527fda7c1ae2 [file] [log] [blame]
package cache
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import (
"fmt"
"io"
"time"
"github.com/apache/trafficcontrol/lib/go-log"
"github.com/apache/trafficcontrol/lib/go-tc"
"github.com/apache/trafficcontrol/traffic_monitor/srvhttp"
"github.com/apache/trafficcontrol/traffic_monitor/todata"
)
// Handler is a cache handler, which fulfills the common/handler `Handler` interface.
type Handler struct {
resultChan chan Result
ToData *todata.TODataThreadsafe
}
func (h Handler) ResultChan() <-chan Result {
return h.resultChan
}
// NewHandler returns a new cache handler. Note this handler does NOT precomputes stat data before calling ResultChan, and Result.Precomputed will be nil
func NewHandler() Handler {
return Handler{resultChan: make(chan Result)}
}
// NewPrecomputeHandler constructs a new cache Handler, which precomputes stat data and populates result.Precomputed before passing to ResultChan.
func NewPrecomputeHandler(toData todata.TODataThreadsafe) Handler {
return Handler{resultChan: make(chan Result), ToData: &toData}
}
// Precompute returns whether this handler precomputes data before passing the result to the ResultChan
func (handler Handler) Precompute() bool {
return handler.ToData != nil
}
// PrecomputedData represents data parsed and pre-computed from the Result.
type PrecomputedData struct {
DeliveryServiceStats map[tc.DeliveryServiceName]*AStat
OutBytes int64
MaxKbps int64
Errors []error
Reporting bool
Time time.Time
}
// Result is the data result returned by a cache.
type Result struct {
ID tc.CacheName
Error error
Astats Astats
Time time.Time
RequestTime time.Duration
Vitals Vitals
PollID uint64
PollFinished chan<- uint64
PrecomputedData PrecomputedData
Available bool
}
// HasStat returns whether the given stat is in the Result.
func (result *Result) HasStat(stat string) bool {
computedStats := ComputedStats()
if _, ok := computedStats[stat]; ok {
return true // health poll has all computed stats
}
if _, ok := result.Astats.Ats[stat]; ok {
return true
}
return false
}
// Vitals is the vitals data returned from a cache.
type Vitals struct {
LoadAvg float64
BytesOut int64
BytesIn int64
KbpsOut int64
MaxKbpsOut int64
}
// Stat is a generic stat, including the untyped value and the time the stat was taken.
type Stat struct {
Time int64 `json:"time"`
Value interface{} `json:"value"`
}
// Stats is designed for returning via the API. It contains result history for each cache, as well as common API data.
type Stats struct {
srvhttp.CommonAPIData
Caches map[tc.CacheName]map[string][]ResultStatVal `json:"caches"`
}
// Filter filters whether stats and caches should be returned from a data set.
type Filter interface {
UseStat(name string) bool
UseCache(name tc.CacheName) bool
WithinStatHistoryMax(int) bool
}
const nsPerMs = 1000000
type StatComputeFunc func(resultInfo ResultInfo, serverInfo tc.TrafficServer, serverProfile tc.TMProfile, combinedState tc.IsAvailable) interface{}
// ComputedStats returns a map of cache stats which are computed by Traffic Monitor (rather than returned literally from ATS), mapped to the func to compute them.
func ComputedStats() map[string]StatComputeFunc {
return map[string]StatComputeFunc{
"availableBandwidthInKbps": func(info ResultInfo, serverInfo tc.TrafficServer, serverProfile tc.TMProfile, combinedState tc.IsAvailable) interface{} {
return info.Vitals.MaxKbpsOut - info.Vitals.KbpsOut
},
"availableBandwidthInMbps": func(info ResultInfo, serverInfo tc.TrafficServer, serverProfile tc.TMProfile, combinedState tc.IsAvailable) interface{} {
return (info.Vitals.MaxKbpsOut - info.Vitals.KbpsOut) / 1000
},
"bandwidth": func(info ResultInfo, serverInfo tc.TrafficServer, serverProfile tc.TMProfile, combinedState tc.IsAvailable) interface{} {
return info.Vitals.KbpsOut
},
"error-string": func(info ResultInfo, serverInfo tc.TrafficServer, serverProfile tc.TMProfile, combinedState tc.IsAvailable) interface{} {
if info.Error != nil {
return info.Error.Error()
}
return "false"
},
"isAvailable": func(info ResultInfo, serverInfo tc.TrafficServer, serverProfile tc.TMProfile, combinedState tc.IsAvailable) interface{} {
return combinedState.IsAvailable // if the cache is missing, default to false
},
"isHealthy": func(info ResultInfo, serverInfo tc.TrafficServer, serverProfile tc.TMProfile, combinedState tc.IsAvailable) interface{} {
if tc.CacheStatusFromString(serverInfo.ServerStatus) == tc.CacheStatusAdminDown {
return true
}
return combinedState.IsAvailable
},
"kbps": func(info ResultInfo, serverInfo tc.TrafficServer, serverProfile tc.TMProfile, combinedState tc.IsAvailable) interface{} {
return info.Vitals.KbpsOut
},
"gbps": func(info ResultInfo, serverInfo tc.TrafficServer, serverProfile tc.TMProfile, combinedState tc.IsAvailable) interface{} {
return float64(info.Vitals.KbpsOut) / 1000000.0
},
"loadavg": func(info ResultInfo, serverInfo tc.TrafficServer, serverProfile tc.TMProfile, combinedState tc.IsAvailable) interface{} {
return info.Vitals.LoadAvg
},
"maxKbps": func(info ResultInfo, serverInfo tc.TrafficServer, serverProfile tc.TMProfile, combinedState tc.IsAvailable) interface{} {
return info.Vitals.MaxKbpsOut
},
"queryTime": func(info ResultInfo, serverInfo tc.TrafficServer, serverProfile tc.TMProfile, combinedState tc.IsAvailable) interface{} {
return info.RequestTime.Nanoseconds() / nsPerMs
},
"stateUrl": func(info ResultInfo, serverInfo tc.TrafficServer, serverProfile tc.TMProfile, combinedState tc.IsAvailable) interface{} {
return serverProfile.Parameters.HealthPollingURL
},
"status": func(info ResultInfo, serverInfo tc.TrafficServer, serverProfile tc.TMProfile, combinedState tc.IsAvailable) interface{} {
return serverInfo.ServerStatus
},
"system.astatsLoad": func(info ResultInfo, serverInfo tc.TrafficServer, serverProfile tc.TMProfile, combinedState tc.IsAvailable) interface{} {
return info.System.AstatsLoad
},
"system.configReloadRequests": func(info ResultInfo, serverInfo tc.TrafficServer, serverProfile tc.TMProfile, combinedState tc.IsAvailable) interface{} {
return info.System.ConfigLoadRequest
},
"system.configReloads": func(info ResultInfo, serverInfo tc.TrafficServer, serverProfile tc.TMProfile, combinedState tc.IsAvailable) interface{} {
return info.System.ConfigReloads
},
"system.inf.name": func(info ResultInfo, serverInfo tc.TrafficServer, serverProfile tc.TMProfile, combinedState tc.IsAvailable) interface{} {
return info.System.InfName
},
"system.inf.speed": func(info ResultInfo, serverInfo tc.TrafficServer, serverProfile tc.TMProfile, combinedState tc.IsAvailable) interface{} {
return info.System.InfSpeed
},
"system.lastReload": func(info ResultInfo, serverInfo tc.TrafficServer, serverProfile tc.TMProfile, combinedState tc.IsAvailable) interface{} {
return info.System.LastReload
},
"system.lastReloadRequest": func(info ResultInfo, serverInfo tc.TrafficServer, serverProfile tc.TMProfile, combinedState tc.IsAvailable) interface{} {
return info.System.LastReloadRequest
},
"system.notAvailable": func(info ResultInfo, serverInfo tc.TrafficServer, serverProfile tc.TMProfile, combinedState tc.IsAvailable) interface{} {
return info.System.NotAvailable
},
"system.proc.loadavg": func(info ResultInfo, serverInfo tc.TrafficServer, serverProfile tc.TMProfile, combinedState tc.IsAvailable) interface{} {
return info.System.ProcLoadavg
},
"system.proc.net.dev": func(info ResultInfo, serverInfo tc.TrafficServer, serverProfile tc.TMProfile, combinedState tc.IsAvailable) interface{} {
return info.System.ProcNetDev
},
}
}
// Handle handles results fetched from a cache, parsing the raw Reader data and passing it along to a chan for further processing.
func (handler Handler) Handle(id string, rdr io.Reader, format string, reqTime time.Duration, reqEnd time.Time, reqErr error, pollID uint64, pollFinished chan<- uint64) {
log.Debugf("poll %v %v (format '%v') handle start\n", pollID, time.Now(), format)
result := Result{
ID: tc.CacheName(id),
Time: reqEnd,
RequestTime: reqTime,
PollID: pollID,
PollFinished: pollFinished,
}
if reqErr != nil {
log.Warnf("%v handler given error '%v'\n", id, reqErr) // error here, in case the thing that called Handle didn't error
result.Error = reqErr
handler.resultChan <- result
return
}
statDecoder, ok := StatsTypeDecoders[format]
if !ok {
log.Errorf("Handler cache '%s' stat type '%s' not found! Returning handle error for this cache poll.\n", id, format)
result.Error = fmt.Errorf("handler stat type %s missing", format)
handler.resultChan <- result
return
}
decodeErr := error(nil)
if decodeErr, result.Astats.Ats, result.Astats.System = statDecoder.Parse(result.ID, rdr); decodeErr != nil {
log.Warnf("%s decode error '%v'\n", id, decodeErr)
result.Error = decodeErr
handler.resultChan <- result
return
}
if result.Astats.System.ProcNetDev == "" {
log.Warnf("Handler cache %s procnetdev empty\n", id)
}
if result.Astats.System.InfSpeed == 0 {
log.Warnf("Handler cache %s inf.speed empty\n", id)
}
result.Available = true
if handler.Precompute() {
result.PrecomputedData = statDecoder.Precompute(result.ID, handler.ToData.Get(), result.Astats.Ats, result.Astats.System)
}
result.PrecomputedData.Reporting = true
result.PrecomputedData.Time = result.Time
handler.resultChan <- result
}