blob: e2e4707a8bf1ffffb4efeede433af8df8f18ab7e [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package main
import (
"encoding/json"
"errors"
"flag"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"net/url"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
"github.com/apache/trafficcontrol/lib/go-tc"
"github.com/apache/trafficcontrol/lib/go-util"
client "github.com/apache/trafficcontrol/traffic_ops/v2-client"
log "github.com/cihub/seelog"
influx "github.com/influxdata/influxdb/client/v2"
)
const UserAgent = "traffic-stats"
const TrafficOpsRequestTimeout = time.Second * time.Duration(10)
const (
defaultPollingInterval = 10
defaultDailySummaryPollingInterval = 60
defaultConfigInterval = 300
defaultPublishingInterval = 30
defaultMaxPublishSize = 10000
)
// StartupConfig contains all fields necessary to create a traffic stats session.
type StartupConfig struct {
ToUser string `json:"toUser"`
ToPasswd string `json:"toPasswd"`
ToURL string `json:"toUrl"`
InfluxUser string `json:"influxUser"`
InfluxPassword string `json:"influxPassword"`
InfluxURLs []string `json:"influxUrls"`
PollingInterval int `json:"pollingInterval"`
DailySummaryPollingInterval int `json:"dailySummaryPollingInterval"`
PublishingInterval int `json:"publishingInterval"`
ConfigInterval int `json:"configInterval"`
MaxPublishSize int `json:"maxPublishSize"`
StatusToMon string `json:"statusToMon"`
SeelogConfig string `json:"seelogConfig"`
CacheRetentionPolicy string `json:"cacheRetentionPolicy"`
DsRetentionPolicy string `json:"dsRetentionPolicy"`
DailySummaryRetentionPolicy string `json:"dailySummaryRetentionPolicy"`
BpsChan chan influx.BatchPoints
InfluxDBs []*InfluxDBProps
}
// RunningConfig is used to store runtime configuration for Traffic Stats. This includes information
// about caches, cachegroups, and health urls
type RunningConfig struct {
HealthUrls map[string]map[string][]string // the 1st map key is CDN_name, the second is DsStats or CacheStats
CacheMap map[string]tc.Server // map hostName to cache
LastSummaryTime time.Time
}
//InfluxDBProps contains URL and connection information for InfluxDB servers
type InfluxDBProps struct {
URL string
InfluxClient influx.Client
}
//Timers struct contains all the timers
type Timers struct {
Poll <-chan time.Time
DailySummary <-chan time.Time
Publish <-chan time.Time
Config <-chan time.Time
}
func main() {
var Bps map[string]influx.BatchPoints
var config StartupConfig
var err error
var tickers Timers
configFile := flag.String("cfg", "", "The config file")
flag.Parse()
if *configFile == "" {
flag.Usage()
panic("-cfg is required")
}
config, err = loadStartupConfig(*configFile, config)
if err != nil {
err = fmt.Errorf("could not load startup config: %v", err)
log.Error(err)
panic(err)
}
Bps = make(map[string]influx.BatchPoints)
config.BpsChan = make(chan influx.BatchPoints)
defer log.Flush()
configChan := make(chan RunningConfig)
go getToData(config, true, configChan)
runningConfig := <-configChan
tickers = setTimers(config)
termChan := make(chan os.Signal, 1)
signal.Notify(termChan, syscall.SIGKILL, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
hupChan := make(chan os.Signal, 1)
signal.Notify(hupChan, syscall.SIGHUP)
for {
select {
case <-hupChan:
log.Info("HUP Received - reloading config")
newConfig, err := loadStartupConfig(*configFile, config)
if err != nil {
log.Errorf("could not load startup config: %v", err)
} else {
config = newConfig
tickers = setTimers(config)
}
case <-termChan:
log.Info("Shutdown Request Received - Sending stored metrics then quitting")
for _, val := range Bps {
sendMetrics(config, val, false)
}
os.Exit(0)
case <-tickers.Publish:
for key, val := range Bps {
go sendMetrics(config, val, true)
delete(Bps, key)
}
case runningConfig = <-configChan:
case <-tickers.Config:
go getToData(config, false, configChan)
case <-tickers.Poll:
for cdnName, urls := range runningConfig.HealthUrls {
for _, u := range urls {
log.Debug(cdnName, " -> ", u)
go calcMetrics(cdnName, u, runningConfig.CacheMap, config)
}
}
case now := <-tickers.DailySummary:
go calcDailySummary(now, config, runningConfig)
case batchPoints := <-config.BpsChan:
log.Debug("Received ", len(batchPoints.Points()), " stats")
key := fmt.Sprintf("%s%s", batchPoints.Database(), batchPoints.RetentionPolicy())
bp, ok := Bps[key]
if ok {
for _, p := range batchPoints.Points() {
bp.AddPoint(p)
}
log.Debug("Aggregating ", len(bp.Points()), " stats to ", key)
} else {
Bps[key] = batchPoints
log.Debug("Created ", key)
}
}
}
}
func setTimers(config StartupConfig) Timers {
var timers Timers
<-time.NewTimer(time.Now().Truncate(time.Duration(config.PollingInterval) * time.Second).Add(time.Duration(config.PollingInterval) * time.Second).Sub(time.Now())).C
timers.Poll = time.Tick(time.Duration(config.PollingInterval) * time.Second)
timers.DailySummary = time.Tick(time.Duration(config.DailySummaryPollingInterval) * time.Second)
timers.Publish = time.Tick(time.Duration(config.PublishingInterval) * time.Second)
timers.Config = time.Tick(time.Duration(config.ConfigInterval) * time.Second)
return timers
}
func loadStartupConfig(configFile string, oldConfig StartupConfig) (StartupConfig, error) {
var config StartupConfig
file, err := os.Open(configFile)
if err != nil {
return config, err
}
decoder := json.NewDecoder(file)
err = decoder.Decode(&config)
if err != nil {
return config, err
}
config.BpsChan = oldConfig.BpsChan
if config.PollingInterval == 0 {
config.PollingInterval = defaultPollingInterval
}
if config.DailySummaryPollingInterval == 0 {
config.DailySummaryPollingInterval = defaultDailySummaryPollingInterval
}
if config.PublishingInterval == 0 {
config.PublishingInterval = defaultPublishingInterval
}
if config.ConfigInterval == 0 {
config.ConfigInterval = defaultConfigInterval
}
if config.MaxPublishSize == 0 {
config.MaxPublishSize = defaultMaxPublishSize
}
logger, err := log.LoggerFromConfigAsFile(config.SeelogConfig)
if err != nil {
return config, fmt.Errorf("error reading Seelog config %s", config.SeelogConfig)
}
log.ReplaceLogger(logger)
log.Info("Replaced logger, see log file according to", config.SeelogConfig)
if len(config.InfluxURLs) == 0 {
return config, fmt.Errorf("No InfluxDB urls provided in influxUrls, please provide at least one valid URL. e.g. \"influxUrls\": [\"http://localhost:8086\"]")
}
for _, u := range config.InfluxURLs {
influxDBProps := InfluxDBProps{
URL: u,
}
config.InfluxDBs = append(config.InfluxDBs, &influxDBProps)
}
//Close old connections explicitly
for _, host := range oldConfig.InfluxDBs {
if host.InfluxClient != nil {
host.InfluxClient.Close()
}
}
return config, nil
}
func calcDailySummary(now time.Time, config StartupConfig, runningConfig RunningConfig) {
log.Infof("lastSummaryTime is %v", runningConfig.LastSummaryTime)
if runningConfig.LastSummaryTime.Day() != now.Day() {
startTime := now.Truncate(24 * time.Hour).Add(-24 * time.Hour)
endTime := startTime.Add(24 * time.Hour)
log.Info("Summarizing from ", startTime, " (", startTime.Unix(), ") to ", endTime, " (", endTime.Unix(), ")")
// influx connection
influxClient, err := influxConnect(config)
if err != nil {
log.Errorf("could not connect to InfluxDb to get daily summary stats: %v", err)
return
}
bp, _ := influx.NewBatchPoints(influx.BatchPointsConfig{
Database: "daily_stats",
Precision: "s",
RetentionPolicy: config.DailySummaryRetentionPolicy,
})
calcDailyMaxGbps(influxClient, bp, startTime, endTime, config)
calcDailyBytesServed(influxClient, bp, startTime, endTime, config)
log.Info("Collected daily stats @ ", now)
}
}
func calcDailyMaxGbps(client influx.Client, bp influx.BatchPoints, startTime time.Time, endTime time.Time, config StartupConfig) {
kilobitsToGigabits := 1000000.00
queryString := fmt.Sprintf(`select time, cdn, max(value) from "monthly"."bandwidth.cdn.1min" where time > '%s' and time < '%s' group by cdn`, startTime.Format(time.RFC3339), endTime.Format(time.RFC3339))
log.Infof("queryString = %v\n", queryString)
res, err := queryDB(client, queryString, "cache_stats")
if err != nil {
log.Errorf("An error occured getting max bandwidth! %v\n", err)
return
}
if res != nil && len(res[0].Series) > 0 {
for _, row := range res[0].Series {
for _, record := range row.Values {
t := record[0].(string)
if record[1] != nil {
cdn := record[1].(string)
value, err := record[2].(json.Number).Float64()
if err != nil {
log.Errorf("Couldn't parse value from record %v\n", record)
continue
}
value = value / kilobitsToGigabits
statTime, _ := time.Parse(time.RFC3339, t)
log.Infof("max gbps for cdn %v = %v", cdn, value)
var statsSummary tc.StatsSummary
statsSummary.CDNName = util.StrPtr(cdn)
statsSummary.DeliveryService = util.StrPtr("all")
statsSummary.StatName = util.StrPtr("daily_maxgbps")
statsSummary.StatValue = util.FloatPtr(value)
statsSummary.SummaryTime = time.Now()
statsSummary.StatDate = &statTime
go writeSummaryStats(config, statsSummary)
//write to influxdb
tags := map[string]string{"cdn": cdn, "deliveryservice": "all"}
fields := map[string]interface{}{
"value": value,
}
pt, err := influx.NewPoint(
"daily_maxgbps",
tags,
fields,
statTime,
)
if err != nil {
log.Errorf("error adding data point for max Gbps...%v\n", err)
continue
}
bp.AddPoint(pt)
}
}
}
}
config.BpsChan <- bp
}
func calcDailyBytesServed(client influx.Client, bp influx.BatchPoints, startTime time.Time, endTime time.Time, config StartupConfig) {
bytesToTerabytes := 1000000000.00
sampleTimeSecs := 60.00
bitsTobytes := 8.00
queryString := fmt.Sprintf(`select mean(value) from "monthly"."bandwidth.cdn.1min" where time > '%s' and time < '%s' group by time(1m), cdn`, startTime.Format(time.RFC3339), endTime.Format(time.RFC3339))
log.Infof("queryString = %v\n", queryString)
res, err := queryDB(client, queryString, "cache_stats")
if err != nil {
log.Error("An error occured getting max bandwidth!\n")
return
}
if res != nil && len(res[0].Series) > 0 {
for _, row := range res[0].Series {
bytesServed := float64(0)
cdn := row.Tags["cdn"]
for _, record := range row.Values {
if record[1] != nil {
value, err := record[1].(json.Number).Float64()
if err != nil {
log.Errorf("Couldn't parse value from record %v\n", record)
continue
}
bytesServed += value * sampleTimeSecs / bitsTobytes
}
}
bytesServedTB := bytesServed / bytesToTerabytes
log.Infof("TBytes served for cdn %v = %v", cdn, bytesServedTB)
//write to Traffic Ops
var statsSummary tc.StatsSummary
statsSummary.CDNName = util.StrPtr(cdn)
statsSummary.DeliveryService = util.StrPtr("all")
statsSummary.StatName = util.StrPtr("daily_bytesserved")
statsSummary.StatValue = util.FloatPtr(bytesServedTB)
statsSummary.SummaryTime = time.Now()
statsSummary.StatDate = &startTime
go writeSummaryStats(config, statsSummary)
//write to Influxdb
tags := map[string]string{"cdn": cdn, "deliveryservice": "all"}
fields := map[string]interface{}{
"value": bytesServedTB, //converted to TB
}
pt, err := influx.NewPoint(
"daily_bytesserved",
tags,
fields,
startTime,
)
if err != nil {
log.Errorf("error adding creating data point for max Gbps...%v\n", err)
continue
}
bp.AddPoint(pt)
}
config.BpsChan <- bp
}
}
func queryDB(con influx.Client, cmd string, database string) (res []influx.Result, err error) {
q := influx.Query{
Command: cmd,
Database: database,
}
if response, err := con.Query(q); err == nil {
if response.Error() != nil {
return res, response.Error()
}
res = response.Results
}
return
}
func writeSummaryStats(config StartupConfig, statsSummary tc.StatsSummary) {
to, _, err := client.LoginWithAgent(config.ToURL, config.ToUser, config.ToPasswd, true, UserAgent, false, TrafficOpsRequestTimeout)
if err != nil {
newErr := fmt.Errorf("Could not store summary stats! Error logging in to %v: %v", config.ToURL, err)
log.Error(newErr)
return
}
_, _, err = to.CreateSummaryStats(statsSummary)
if err != nil {
log.Errorf("could not create summary stats: %v", err)
}
}
func getToData(config StartupConfig, init bool, configChan chan RunningConfig) {
var runningConfig RunningConfig
to, _, err := client.LoginWithAgent(config.ToURL, config.ToUser, config.ToPasswd, true, UserAgent, false, TrafficOpsRequestTimeout)
if err != nil {
msg := fmt.Sprintf("Error logging in to %v: %v", config.ToURL, err)
if init {
panic(msg)
}
log.Error(msg)
return
}
servers, _, err := to.GetServers()
if err != nil {
msg := fmt.Sprintf("Error getting server list from %v: %v ", config.ToURL, err)
if init {
panic(msg)
}
log.Error(msg)
return
}
runningConfig.CacheMap = make(map[string]tc.Server)
for _, server := range servers {
runningConfig.CacheMap[server.HostName] = server
}
cacheStatPath := "/publish/CacheStats?hc=1&wildcard=1&stats="
dsStatPath := "/publish/DsStats?hc=1&wildcard=1&stats="
parameters, _, err := to.GetParametersByProfileName("TRAFFIC_STATS")
if err != nil {
msg := fmt.Sprintf("Error getting parameter list from %v: %v", config.ToURL, err)
if init {
panic(msg)
}
log.Error(msg)
return
}
for _, param := range parameters {
if param.Name == "DsStats" {
statName := param.Value
dsStatPath += "," + statName
} else if param.Name == "CacheStats" {
cacheStatPath += "," + param.Value
}
}
cacheStatPath = strings.Replace(cacheStatPath, "=,", "=", 1)
dsStatPath = strings.Replace(dsStatPath, "=,", "=", 1)
setHealthURLs(config, &runningConfig, cacheStatPath, dsStatPath)
lastSummaryTimeResponse, _, err := to.GetSummaryStatsLastUpdated(util.StrPtr("daily_maxgbps"))
if err != nil {
log.Errorf("unable to get summary stats last updated: %v", err)
} else if lastSummaryTimeResponse.Response.SummaryTime == nil {
log.Warn("unable to get last updated stats summary timestamp: daily_maxgbps stats summary not reported yet")
} else {
runningConfig.LastSummaryTime = *lastSummaryTimeResponse.Response.SummaryTime
}
configChan <- runningConfig
}
func setHealthURLs(config StartupConfig, runningConfig *RunningConfig, cacheStatPath string, dsStatPath string) {
runningConfig.HealthUrls = make(map[string]map[string][]string)
for _, server := range runningConfig.CacheMap {
if server.Type == tc.MonitorTypeName && server.Status != config.StatusToMon {
log.Debugf("Skipping %s.%s. Looking for status %s but got status %s", server.HostName, server.DomainName, config.StatusToMon, server.Status)
continue
}
if server.Type == tc.MonitorTypeName && server.Status == config.StatusToMon {
cdnName := server.CDNName
if cdnName == "" {
log.Error("Unable to find CDN name for " + server.HostName + ".. skipping")
continue
}
if runningConfig.HealthUrls[cdnName] == nil {
runningConfig.HealthUrls[cdnName] = make(map[string][]string)
}
healthURL := "http://" + server.HostName + "." + server.DomainName + ":" + strconv.Itoa(server.TCPPort) + cacheStatPath
runningConfig.HealthUrls[cdnName]["CacheStats"] = append(runningConfig.HealthUrls[cdnName]["CacheStats"], healthURL)
healthURL = "http://" + server.HostName + "." + server.DomainName + ":" + strconv.Itoa(server.TCPPort) + dsStatPath
runningConfig.HealthUrls[cdnName]["DsStats"] = append(runningConfig.HealthUrls[cdnName]["DsStats"], healthURL)
}
}
}
func calcMetrics(cdnName string, urls []string, cacheMap map[string]tc.Server, config StartupConfig) {
sampleTime := time.Now().Unix()
// get the data from trafficMonitor
var trafMonData []byte
var err error
var healthURL string
for _, u := range urls {
trafMonData, err = getURL(u)
if err != nil {
log.Errorf("error getting %s stats URL %s: %v", cdnName, u, err)
continue
}
healthURL = u
log.Infof("successfully got %s stats URL %s", cdnName, u)
break
}
if healthURL == "" {
log.Errorf("unable to get any %s stats URL - skipping timeslot", cdnName)
return
}
if strings.Contains(healthURL, "CacheStats") {
err = calcCacheValues(trafMonData, cdnName, sampleTime, cacheMap, config)
if err != nil {
log.Errorf("error calculating cache metric values for CDN %s: %v", cdnName, err)
}
} else if strings.Contains(healthURL, "DsStats") {
err = calcDsValues(trafMonData, cdnName, sampleTime, config)
if err != nil {
log.Errorf("error calculating delivery service metric values for CDN %s: %v", cdnName, err)
}
} else {
log.Warn("Don't know what to do with given %s stats URL: ", cdnName, healthURL)
}
}
func calcDsValues(tmData []byte, cdnName string, sampleTime int64, config StartupConfig) error {
type DsStatsJSON struct {
Pp string `json:"pp"`
Date string `json:"date"`
DeliveryService map[string]map[string][]struct {
Index uint64 `json:"index"`
Time int `json:"time"`
Value string `json:"value"`
Span uint64 `json:"span"`
} `json:"deliveryService"`
}
var jData DsStatsJSON
err := json.Unmarshal(tmData, &jData)
if err != nil {
return fmt.Errorf("could not unmarshall deliveryservice stats JSON - %v", err)
}
statCount := 0
bps, _ := influx.NewBatchPoints(influx.BatchPointsConfig{
Database: "deliveryservice_stats",
Precision: "ms",
RetentionPolicy: config.DsRetentionPolicy,
})
for dsName, dsData := range jData.DeliveryService {
for dsMetric, dsMetricData := range dsData {
//Get the stat time and make sure it's greater than the time 24 hours ago. If not, skip it so influxdb doesn't throw retention policy errors.
validTime := time.Now().AddDate(0, 0, -1).UnixNano() / 1000000
timeStamp := int64(dsMetricData[0].Time)
if timeStamp < validTime {
log.Info(fmt.Sprintf("Skipping %v %v: %v is greater than 24 hours old.", dsName, dsMetric, timeStamp))
continue
}
var cachegroup, statName string
tags := map[string]string{
"deliveryservice": dsName,
"cdn": cdnName,
}
s := strings.Split(dsMetric, ".")
if strings.Contains(dsMetric, "type.") {
cachegroup = "all"
statName = s[2]
tags["type"] = s[1]
} else if strings.Contains(dsMetric, "total.") {
cachegroup, statName = s[0], s[1]
} else {
cachegroup, statName = s[1], s[2]
}
tags["cachegroup"] = cachegroup
//convert stat time to epoch
statTime := strconv.Itoa(dsMetricData[0].Time)
msInt, err := strconv.ParseInt(statTime, 10, 64)
if err != nil {
log.Errorf("calculating delivery service metric values: error parsing stat time: %v", err)
}
newTime := time.Unix(0, msInt*int64(time.Millisecond))
//convert stat value to float
statValue := dsMetricData[0].Value
statFloatValue, err := strconv.ParseFloat(statValue, 64)
if err != nil {
statFloatValue = 0.0
}
fields := map[string]interface{}{
"value": statFloatValue,
}
pt, err := influx.NewPoint(
statName,
tags,
fields,
newTime,
)
if err != nil {
log.Errorf("calculating delivery service metric values: error creating new influxDB point: %v", err)
continue
}
bps.AddPoint(pt)
statCount++
}
}
config.BpsChan <- bps
log.Info("Collected ", statCount, " deliveryservice stats values for ", cdnName, " @ ", sampleTime)
return nil
}
func calcCacheValues(trafmonData []byte, cdnName string, sampleTime int64, cacheMap map[string]tc.Server, config StartupConfig) error {
var jData tc.LegacyStats
err := json.Unmarshal(trafmonData, &jData)
if err != nil {
return fmt.Errorf("could not unmarshall cache stats JSON - %v", err)
}
statCount := 0
bps, err := influx.NewBatchPoints(influx.BatchPointsConfig{
Database: "cache_stats",
Precision: "ms",
RetentionPolicy: config.CacheRetentionPolicy,
})
if err != nil {
log.Errorf("calculating cache metric values: creating new influxDB batch points: %v", err)
}
for cacheName, cacheData := range jData.Caches {
cache := cacheMap[string(cacheName)]
for statName, statData := range cacheData {
if len(statData) == 0 {
continue
}
//Get the stat time and make sure it's greater than the time 24 hours ago. If not, skip it so influxdb doesn't throw retention policy errors.
validTime := time.Now().AddDate(0, 0, -1)
if statData[0].Time.Before(validTime) {
log.Info(fmt.Sprintf("Skipping %v %v: %v is greater than 24 hours old.", cacheName, statName, statData[0].Time))
continue
}
dataKey := statName
dataKey = strings.Replace(dataKey, ".bandwidth", ".kbps", 1)
dataKey = strings.Replace(dataKey, "-", "_", -1)
//Get the stat value and convert to float
statFloatValue := 0.0
if statsValue, ok := statData[0].Val.(string); !ok {
log.Warnf("stat data %s with value %v couldn't be converted into string", statName, statData[0].Val)
} else {
statFloatValue, err = strconv.ParseFloat(statsValue, 64)
if err != nil {
log.Warnf("stat %s with value %v couldn't be converted into a float", statName, statsValue)
}
}
tags := map[string]string{
"cachegroup": cache.Cachegroup,
"hostname": string(cacheName),
"cdn": cdnName,
"type": cache.Type,
}
fields := map[string]interface{}{
"value": statFloatValue,
}
pt, err := influx.NewPoint(
dataKey,
tags,
fields,
statData[0].Time,
)
if err != nil {
log.Errorf("calculating cache metric values: error creating new influxDB point: %v", err)
continue
}
bps.AddPoint(pt)
statCount++
}
}
config.BpsChan <- bps
log.Info("Collected ", statCount, " cache stats values for ", cdnName, " @ ", sampleTime)
return nil
}
func getURL(url string) ([]byte, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return body, nil
}
func influxConnect(config StartupConfig) (influx.Client, error) {
hosts := config.InfluxDBs
for len(hosts) > 0 {
n := rand.Intn(len(hosts))
host := hosts[n]
hosts = append(hosts[:n], hosts[n+1:]...)
parsedURL, _ := url.Parse(host.URL)
if parsedURL.Scheme == "udp" {
conf := influx.UDPConfig{
Addr: parsedURL.Host,
}
con, err := influx.NewUDPClient(conf)
if err != nil {
log.Errorf("An error occurred creating InfluxDB UDP client: %v", err)
continue
}
return con, nil
}
//if not udp assume HTTP client
conf := influx.HTTPConfig{
Addr: parsedURL.String(),
Username: config.InfluxUser,
Password: config.InfluxPassword,
}
con, err := influx.NewHTTPClient(conf)
if err != nil {
log.Errorf("An error occurred creating InfluxDB HTTP client: %v", err)
continue
}
//Close old connections explicitly
if host.InfluxClient != nil {
host.InfluxClient.Close()
}
host.InfluxClient = con
_, _, err = con.Ping(10)
if err != nil {
log.Warnf("pinging InfluxDB: %v", err)
continue
}
return con, nil
}
err := errors.New("Could not connect to any of the InfluxDb servers defined in the influxUrls config.")
return nil, err
}
func sendMetrics(config StartupConfig, bps influx.BatchPoints, retry bool) {
influxClient, err := influxConnect(config)
if err != nil {
if retry {
config.BpsChan <- bps
}
log.Errorf("sending metrics to InfluxDB: unable to get InfluxDB client: %v", err)
return
}
pts := bps.Points()
for len(pts) > 0 {
chunkBps, err := influx.NewBatchPoints(influx.BatchPointsConfig{
Database: bps.Database(),
Precision: bps.Precision(),
RetentionPolicy: bps.RetentionPolicy(),
})
if err != nil {
if retry {
config.BpsChan <- chunkBps
}
log.Errorf("sending metrics to InfluxDB: error creating new batch points: %v", err)
}
for _, p := range pts[:intMin(config.MaxPublishSize, len(pts))] {
chunkBps.AddPoint(p)
}
pts = pts[intMin(config.MaxPublishSize, len(pts)):]
err = influxClient.Write(chunkBps)
if err != nil {
if retry {
config.BpsChan <- chunkBps
}
log.Errorf("sending metrics to InfluxDB: error writing batch points: %v", err)
} else {
log.Info(fmt.Sprintf("Sent %v stats for %v", len(chunkBps.Points()), chunkBps.Database()))
}
}
}
func intMin(a, b int) int {
if a < b {
return a
}
return b
}