blob: 22dbe883b6f8c43d7a6c72ab1560072bf1b1e682 [file] [log] [blame]
package cdni
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import (
"database/sql"
"encoding/json"
"errors"
"fmt"
"net/http"
"strconv"
"strings"
"time"
"github.com/apache/trafficcontrol/lib/go-log"
"github.com/apache/trafficcontrol/lib/go-rfc"
"github.com/apache/trafficcontrol/lib/go-tc"
"github.com/apache/trafficcontrol/traffic_ops/traffic_ops_golang/api"
"github.com/lestrrat-go/jwx/jwa"
"github.com/lestrrat-go/jwx/jwt"
"github.com/lib/pq"
)
const (
CapabilityQuery = `SELECT id, type, ucdn FROM cdni_capabilities WHERE type = $1 AND ucdn = $2`
AllFootprintQuery = `SELECT footprint_type, footprint_value::text[], capability_id FROM cdni_footprints`
limitsQuery = `
SELECT limit_id, scope_type, scope_value, limit_type, maximum_hard, maximum_soft, cl.telemetry_id, cl.telemetry_metric, t.id, t.type, tm.name, cl.capability_id
FROM cdni_limits AS cl
LEFT JOIN cdni_telemetry as t ON telemetry_id = t.id
LEFT JOIN cdni_telemetry_metrics as tm ON telemetry_metric = tm.name`
InsertCapabilityUpdateQuery = `INSERT INTO cdni_capability_updates (ucdn, data, async_status_id, request_type, host) VALUES ($1, $2, $3, $4, $5)`
SelectCapabilityUpdateQuery = `SELECT ucdn, data, async_status_id, request_type, host FROM cdni_capability_updates WHERE id = $1`
SelectAllCapabilityUpdatesQuery = `SELECT id, ucdn, data, request_type, host FROM cdni_capability_updates`
DeleteCapabilityUpdateQuery = `DELETE FROM cdni_capability_updates WHERE id = $1`
UpdateLimitsByCapabilityAndLimitTypeQuery = `UPDATE cdni_limits SET maximum_hard = $1 WHERE capability_id = $2 AND limit_type = $3`
hostQuery = `SELECT count(*) FROM cdni_limits WHERE $1 = ANY(scope_value)`
hostConfigLabel = "hostConfigUpdate"
)
// GetCapabilities returns the CDNi capability limits.
func GetCapabilities(w http.ResponseWriter, r *http.Request) {
inf, userErr, sysErr, errCode := api.NewInfo(r, nil, nil)
if userErr != nil || sysErr != nil {
api.HandleErr(w, r, inf.Tx.Tx, errCode, userErr, sysErr)
return
}
defer inf.Close()
if inf.Config.Cdni == nil || inf.Config.Secrets[0] == "" || inf.Config.Cdni.DCdnId == "" {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusInternalServerError, nil, errors.New("cdn.conf does not contain CDNi information"))
return
}
bearerToken := getBearerToken(r)
ucdn, err := checkBearerToken(bearerToken, inf)
if err != nil {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusBadRequest, err, nil)
return
}
capacities, err := getCapacities(inf, ucdn)
if err != nil {
api.HandleErr(w, r, nil, http.StatusInternalServerError, err, nil)
return
}
telemetries, err := getTelemetries(inf, ucdn)
if err != nil {
api.HandleErr(w, r, nil, http.StatusInternalServerError, err, nil)
return
}
fciCaps := Capabilities{}
capsList := make([]Capability, 0, len(capacities.Capabilities)+len(telemetries.Capabilities))
capsList = append(capsList, capacities.Capabilities...)
capsList = append(capsList, telemetries.Capabilities...)
fciCaps.Capabilities = capsList
api.WriteRespRaw(w, r, fciCaps)
}
func getBearerToken(r *http.Request) string {
if r.Header.Get(rfc.Authorization) != "" && strings.Contains(r.Header.Get(rfc.Authorization), "Bearer") {
givenTokenSplit := strings.Split(r.Header.Get(rfc.Authorization), " ")
if len(givenTokenSplit) < 2 {
return ""
}
return givenTokenSplit[1]
}
for _, cookie := range r.Cookies() {
switch cookie.Name {
case api.AccessToken:
return cookie.Value
}
}
return ""
}
// PutHostConfiguration adds the requested CDNi configuration update for a specific host to the queue and adds an async status.
func PutHostConfiguration(w http.ResponseWriter, r *http.Request) {
inf, userErr, sysErr, errCode := api.NewInfo(r, []string{"host"}, nil)
if userErr != nil || sysErr != nil {
api.HandleErr(w, r, inf.Tx.Tx, errCode, userErr, sysErr)
return
}
defer inf.Close()
host := inf.Params["host"]
if errCode, userErr, sysErr := validateHostExists(host, inf.Tx.Tx); userErr != nil || sysErr != nil {
api.HandleErr(w, r, inf.Tx.Tx, errCode, userErr, sysErr)
return
}
if inf.Config.Cdni == nil || inf.Config.Secrets[0] == "" || inf.Config.Cdni.DCdnId == "" {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusInternalServerError, nil, errors.New("cdn.conf does not contain CDNi information"))
return
}
bearerToken := getBearerToken(r)
ucdn, err := checkBearerToken(bearerToken, inf)
if err != nil {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusBadRequest, err, nil)
return
}
var genericHostRequest GenericHostMetadata
err = json.NewDecoder(r.Body).Decode(&genericHostRequest)
if err != nil {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusInternalServerError, nil, fmt.Errorf("decoding host json request: %w", err))
return
}
db, err := api.GetDB(r.Context())
if err != nil {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusInternalServerError, nil, fmt.Errorf("getting async db: %w", err))
return
}
asyncTx, err := db.Begin()
if err != nil {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusInternalServerError, nil, fmt.Errorf("getting async tx: %w", err))
return
}
logTx, err := db.Begin()
if err != nil {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusInternalServerError, nil, fmt.Errorf("getting log tx: %w", err))
return
}
defer logTx.Commit()
asyncStatusId, errCode, userErr, sysErr := api.InsertAsyncStatus(asyncTx, "CDNi host configuration update request received.")
if userErr != nil || sysErr != nil {
api.HandleErr(w, r, inf.Tx.Tx, errCode, userErr, sysErr)
return
}
data := genericHostRequest.HostMetadata.Metadata
_, err = inf.Tx.Tx.Query(InsertCapabilityUpdateQuery, ucdn, data, asyncStatusId, hostConfigLabel, host)
if err != nil {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusInternalServerError, nil, fmt.Errorf("inserting capability update request into queue: %w", err))
return
}
msg := "CDNi configuration update request received. Status updates can be found here: " + api.CurrentAsyncEndpoint + strconv.Itoa(asyncStatusId)
api.CreateChangeLogRawTx(api.ApiChange, msg, inf.User, logTx)
var alerts tc.Alerts
alerts.AddAlert(tc.Alert{
Text: msg,
Level: tc.SuccessLevel.String(),
})
w.Header().Add(rfc.Location, api.CurrentAsyncEndpoint+strconv.Itoa(asyncStatusId))
api.WriteAlerts(w, r, http.StatusAccepted, alerts)
}
// PutConfiguration adds the requested CDNi configuration update to the queue and adds an async status.
func PutConfiguration(w http.ResponseWriter, r *http.Request) {
inf, userErr, sysErr, errCode := api.NewInfo(r, nil, nil)
if userErr != nil || sysErr != nil {
api.HandleErr(w, r, inf.Tx.Tx, errCode, userErr, sysErr)
return
}
defer inf.Close()
if inf.Config.Cdni == nil || inf.Config.Secrets[0] == "" || inf.Config.Cdni.DCdnId == "" {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusInternalServerError, nil, errors.New("cdn.conf does not contain CDNi information"))
return
}
bearerToken := getBearerToken(r)
ucdn, err := checkBearerToken(bearerToken, inf)
if err != nil {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusBadRequest, err, nil)
return
}
var genericRequest GenericRequestMetadata
err = json.NewDecoder(r.Body).Decode(&genericRequest)
if err != nil {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusInternalServerError, nil, fmt.Errorf("decoding json request: %w", err))
return
}
db, err := api.GetDB(r.Context())
if err != nil {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusInternalServerError, nil, fmt.Errorf("getting async db: %w", err))
return
}
asyncTx, err := db.Begin()
if err != nil {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusInternalServerError, nil, fmt.Errorf("getting async tx: %w", err))
return
}
logTx, err := db.Begin()
if err != nil {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusInternalServerError, nil, fmt.Errorf("getting log tx: %w", err))
return
}
defer logTx.Commit()
asyncStatusId, errCode, userErr, sysErr := api.InsertAsyncStatus(asyncTx, "CDNi configuration update request received.")
if userErr != nil || sysErr != nil {
api.HandleErr(w, r, inf.Tx.Tx, errCode, userErr, sysErr)
return
}
data := genericRequest.Metadata
_, err = inf.Tx.Tx.Query(InsertCapabilityUpdateQuery, ucdn, data, asyncStatusId, SupportedGenericMetadataType(genericRequest.Type), genericRequest.Host)
if err != nil {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusInternalServerError, nil, fmt.Errorf("inserting capability update request into queue: %w", err))
return
}
msg := "CDNi configuration update request received. Status updates can be found here: " + api.CurrentAsyncEndpoint + strconv.Itoa(asyncStatusId)
api.CreateChangeLogRawTx(api.ApiChange, msg, inf.User, logTx)
var alerts tc.Alerts
alerts.AddAlert(tc.Alert{
Text: msg,
Level: tc.SuccessLevel.String(),
})
w.Header().Add(rfc.Location, api.CurrentAsyncEndpoint+strconv.Itoa(asyncStatusId))
api.WriteAlerts(w, r, http.StatusAccepted, alerts)
}
// GetRequests returns the CDNi configuration update requests.
func GetRequests(w http.ResponseWriter, r *http.Request) {
inf, userErr, sysErr, errCode := api.NewInfo(r, nil, nil)
if userErr != nil || sysErr != nil {
api.HandleErr(w, r, inf.Tx.Tx, errCode, userErr, sysErr)
return
}
defer inf.Close()
var rows *sql.Rows
var err error
idParam := inf.Params["id"]
if idParam != "" {
id, parseErr := strconv.Atoi(idParam)
if parseErr != nil {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusBadRequest, errors.New("id must be an integer"), nil)
return
}
rows, err = inf.Tx.Tx.Query(SelectAllCapabilityUpdatesQuery+" WHERE id = $1", id)
} else {
rows, err = inf.Tx.Tx.Query(SelectAllCapabilityUpdatesQuery)
}
if err != nil {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusInternalServerError, nil, fmt.Errorf("querying for capability update requests: %w", err))
return
}
defer log.Close(rows, "closing capabilities update query")
requests := []ConfigurationUpdateRequest{}
for rows.Next() {
var request ConfigurationUpdateRequest
if err := rows.Scan(&request.ID, &request.UCDN, &request.Data, &request.RequestType, &request.Host); err != nil {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusInternalServerError, nil, fmt.Errorf("scanning db rows: %w", err))
return
}
requests = append(requests, request)
}
api.WriteResp(w, r, requests)
}
// PutConfigurationResponse approves or denies a CDNi configuration request and updates the configuration and async status appropriately.
func PutConfigurationResponse(w http.ResponseWriter, r *http.Request) {
inf, userErr, sysErr, errCode := api.NewInfo(r, []string{"approved", "id"}, []string{"id"})
if userErr != nil || sysErr != nil {
api.HandleErr(w, r, inf.Tx.Tx, errCode, userErr, sysErr)
return
}
defer inf.Close()
reqId := inf.IntParams["id"]
approvedString := inf.Params["approved"]
approved, err := strconv.ParseBool(approvedString)
if err != nil {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusBadRequest, errors.New("approved parameter must be a boolean"), nil)
return
}
db, err := api.GetDB(r.Context())
if err != nil {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusInternalServerError, nil, fmt.Errorf("getting async db: %w", err))
return
}
logTx, err := db.Begin()
if err != nil {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusInternalServerError, nil, fmt.Errorf("getting log tx: %w", err))
return
}
defer logTx.Commit()
rows, err := inf.Tx.Tx.Query(SelectCapabilityUpdateQuery, reqId)
if err != nil {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusInternalServerError, nil, fmt.Errorf("querying for capability update request: %w", err))
return
}
defer log.Close(rows, "closing capabilities update query")
var ucdn string
var data json.RawMessage
var host string
var asyncId int
var requestType string
count := 0
for rows.Next() {
if err := rows.Scan(&ucdn, &data, &asyncId, &requestType, &host); err != nil {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusInternalServerError, nil, fmt.Errorf("scanning db rows: %w", err))
return
}
count++
}
if count == 0 {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusNotFound, fmt.Errorf("no configuration request for that id"), nil)
return
}
if !approved {
if asyncErr := api.UpdateAsyncStatus(db, api.AsyncFailed, "Requested configuration update has been denied.", asyncId, true); asyncErr != nil {
log.Errorf("updating async status for id %d: %s", asyncId, asyncErr.Error())
}
status, err := deleteCapabilityRequest(reqId, inf.Tx.Tx)
if err != nil {
api.HandleErr(w, r, inf.Tx.Tx, status, nil, fmt.Errorf("deleting configuration request from queue: %w", err))
return
}
msg := "Successfully denied configuration update request."
api.CreateChangeLogRawTx(api.ApiChange, msg, inf.User, inf.Tx.Tx)
api.WriteResp(w, r, msg)
return
}
var updatedDataList []GenericMetadata
if err = json.Unmarshal(data, &updatedDataList); err != nil {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusInternalServerError, nil, fmt.Errorf("unmarshalling data for configuration update: %w", err))
return
}
var unsupportedTypes []string
for _, updatedData := range updatedDataList {
if !updatedData.Type.isValid() {
unsupportedTypes = append(unsupportedTypes, string(updatedData.Type))
}
}
if len(unsupportedTypes) != 0 {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusBadRequest, fmt.Errorf("unsupported generic metadata types found: %v", strings.Join(unsupportedTypes, ", ")), nil)
return
}
for _, updatedData := range updatedDataList {
switch updatedData.Type {
case MiRequestedCapacityLimits:
var capacityRequestedLimits CapacityRequestedLimits
if err = json.Unmarshal(updatedData.Value, &capacityRequestedLimits); err != nil {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusInternalServerError, nil, fmt.Errorf("unmarshalling data for configuration update: %w", err))
return
}
for _, capLim := range capacityRequestedLimits.RequestedLimits {
capId, err := getCapabilityIdFromFootprints(capLim, ucdn, inf)
if err != nil {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusInternalServerError, nil, fmt.Errorf("finding capability for given information: %w", err))
return
}
query := UpdateLimitsByCapabilityAndLimitTypeQuery
queryParams := []interface{}{capLim.LimitValue, capId, capLim.LimitType}
if host != "" {
query = query + " AND $4 = ANY(scope_value)"
queryParams = []interface{}{capLim.LimitValue, capId, capLim.LimitType, host}
}
result, err := inf.Tx.Tx.Exec(query, queryParams...)
if err != nil {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusInternalServerError, nil, fmt.Errorf("updating capacity: %w", err))
return
}
if rowsAffected, err := result.RowsAffected(); err != nil {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusInternalServerError, nil, fmt.Errorf("updating capacity: getting rows affected: %w", err))
return
} else if rowsAffected < 1 {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusNotFound, fmt.Errorf("no capacity found for update: host: %s, type: %s, limit: %v", host, updatedData.Type, capLim), nil)
return
} else if rowsAffected > 1 {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusInternalServerError, nil, fmt.Errorf("capacity update affected too many rows: %d", rowsAffected))
return
}
}
}
}
if asyncErr := api.UpdateAsyncStatus(db, api.AsyncSucceeded, "Capacity requested update has been completed.", asyncId, true); asyncErr != nil {
log.Errorf("updating async status for id %v: %v", asyncId, asyncErr)
}
status, err := deleteCapabilityRequest(reqId, inf.Tx.Tx)
if err != nil {
api.HandleErr(w, r, inf.Tx.Tx, status, nil, fmt.Errorf("deleting capacity request from queue: %w", err))
return
}
msg := "Successfully updated configuration."
api.CreateChangeLogRawTx(api.ApiChange, msg, inf.User, logTx)
api.WriteResp(w, r, msg)
}
func getCapabilityIdFromFootprints(updatedData CapacityLimit, ucdn string, inf *api.APIInfo) (int, error) {
tableAbbr := ""
selectClause := ""
whereClause := ""
var queryParams []interface{}
paramCount := 1
for i, footprint := range updatedData.Footprints {
if i == 0 {
tableAbbr = "f"
selectClause = "SELECT " + tableAbbr + ".capability_id FROM cdni_footprints as " + tableAbbr
whereClause = " WHERE " + tableAbbr + ".ucdn = $" + strconv.Itoa(paramCount) + " AND " + tableAbbr + ".footprint_type = $" + strconv.Itoa(paramCount+1) + " AND " + tableAbbr + ".footprint_value = $" + strconv.Itoa(paramCount+2) + "::text[]"
} else {
oldTableAbbr := tableAbbr
tableAbbr = tableAbbr + "f"
selectClause = selectClause + " JOIN cdni_footprints as " + tableAbbr + " on " + tableAbbr + ".capability_id = " + oldTableAbbr + ".capability_id"
whereClause = whereClause + " AND " + tableAbbr + ".ucdn = $" + strconv.Itoa(paramCount) + " AND " + tableAbbr + ".footprint_type = $" + strconv.Itoa(paramCount+1) + " AND " + tableAbbr + ".footprint_value = $" + strconv.Itoa(paramCount+2) + "::text[]"
}
paramCount = paramCount + 3
queryParams = append(queryParams, ucdn)
queryParams = append(queryParams, footprint.FootprintType)
queryParams = append(queryParams, pq.Array(footprint.FootprintValue))
}
selectQuery := selectClause + whereClause + " AND (SELECT count(*) from cdni_footprints as c where c.capability_id = f.capability_id) = " + strconv.Itoa(len(updatedData.Footprints))
rows, err := inf.Tx.Tx.Query(selectQuery, queryParams...)
if err != nil {
return 0, fmt.Errorf("querying for capacity update request: %w", err)
}
defer log.Close(rows, "closing footprints query")
var capabilityIds []int
rowCount := 0
for rows.Next() {
var capabilityId int
if err := rows.Scan(&capabilityId); err != nil {
return 0, fmt.Errorf("scanning db rows: %w", err)
}
rowCount++
capabilityIds = append(capabilityIds, capabilityId)
}
if len(capabilityIds) == 0 {
return 0, fmt.Errorf("no capabilities found that match all footprints: %v", updatedData.Footprints)
}
if len(capabilityIds) > 1 {
return 0, fmt.Errorf("more than 1 capability found that match all footprints: %v", updatedData.Footprints)
}
return capabilityIds[0], nil
}
func deleteCapabilityRequest(reqId int, tx *sql.Tx) (int, error) {
result, err := tx.Exec(DeleteCapabilityUpdateQuery, reqId)
if err != nil {
return http.StatusInternalServerError, fmt.Errorf("deleting configuration update: %w", err)
}
if rowsAffected, err := result.RowsAffected(); err != nil {
return http.StatusInternalServerError, fmt.Errorf("deleting configuration update: getting rows affected: %w", err)
} else if rowsAffected < 1 {
return http.StatusNotFound, errors.New("no configuration update with that key found")
} else if rowsAffected > 1 {
return http.StatusInternalServerError, fmt.Errorf("delete affected too many rows: %d", rowsAffected)
}
return http.StatusOK, nil
}
func validateHostExists(host string, tx *sql.Tx) (int, error, error) {
count := 0
if err := tx.QueryRow(hostQuery, host).Scan(&count); err != nil {
return http.StatusInternalServerError, nil, fmt.Errorf("querying if host %s exists: %w", host, err)
}
if count == 0 {
return http.StatusBadRequest, fmt.Errorf("No data found for host: %s", host), nil
}
return http.StatusOK, nil, nil
}
func checkBearerToken(bearerToken string, inf *api.APIInfo) (string, error) {
if bearerToken == "" {
return "", errors.New("bearer token is required")
}
token, err := jwt.Parse([]byte(bearerToken),
jwt.WithVerify(jwa.HS256, []byte(inf.Config.Secrets[0])),
)
if err != nil {
return "", fmt.Errorf("invalid token: %w", err)
}
if token.Expiration().Unix() < time.Now().Unix() {
return "", errors.New("token is expired")
}
if token.Audience() == nil || len(token.Audience()) == 0 {
return "", errors.New("invalid token - dcdn must be defined in audience claim")
}
if token.Audience()[0] != inf.Config.Cdni.DCdnId {
return "", errors.New("invalid token - incorrect dcdn")
}
ucdn := token.Issuer()
if ucdn != inf.User.UCDN {
return "", errors.New("user ucdn did not match token ucdn")
}
if ucdn == "" {
if inf.User.Can("ICDN:UCDN-OVERRIDE") {
ucdn = inf.Params["ucdn"]
if ucdn == "" {
return "", errors.New("admin level ucdn requests require a ucdn query parameter")
}
} else {
return "", errors.New("invalid token - empty ucdn field")
}
}
return ucdn, nil
}
func getFootprintMap(tx *sql.Tx) (map[int][]Footprint, error) {
footRows, err := tx.Query(AllFootprintQuery)
if err != nil {
return nil, fmt.Errorf("querying footprints: %w", err)
}
defer log.Close(footRows, "closing foorpint query")
footprintMap := map[int][]Footprint{}
for footRows.Next() {
var footprint Footprint
if err := footRows.Scan(&footprint.FootprintType, pq.Array(&footprint.FootprintValue), &footprint.CapabilityId); err != nil {
return nil, fmt.Errorf("scanning db rows: %w", err)
}
footprintMap[footprint.CapabilityId] = append(footprintMap[footprint.CapabilityId], footprint)
}
return footprintMap, nil
}
func getLimitsMap(tx *sql.Tx) (map[int][]LimitsQueryResponse, error) {
rows, err := tx.Query(limitsQuery)
if err != nil {
return nil, fmt.Errorf("querying limits: %w", err)
}
defer log.Close(rows, "closing capacity limits query")
limitsMap := map[int][]LimitsQueryResponse{}
for rows.Next() {
var limit LimitsQueryResponse
var scope LimitScope
if err := rows.Scan(&limit.LimitId, &scope.ScopeType, pq.Array(&scope.ScopeValue), &limit.LimitType, &limit.MaximumHard, &limit.MaximumSoft, &limit.TelemetryId, &limit.TelemetryMetic, &limit.Id, &limit.Type, &limit.Name, &limit.CapabilityId); err != nil {
return nil, fmt.Errorf("scanning db rows: %w", err)
}
if scope.ScopeType != nil {
limit.Scope = &scope
}
limitsMap[limit.CapabilityId] = append(limitsMap[limit.CapabilityId], limit)
}
return limitsMap, nil
}
func getTelemetriesMap(tx *sql.Tx) (map[int][]Telemetry, error) {
rows, err := tx.Query(`SELECT id, type, capability_id, configuration_url FROM cdni_telemetry`)
if err != nil {
return nil, errors.New("querying cdni telemetry: " + err.Error())
}
defer log.Close(rows, "closing telemetry query")
telemetryMap := map[int][]Telemetry{}
for rows.Next() {
telemetry := Telemetry{}
if err := rows.Scan(&telemetry.Id, &telemetry.Type, &telemetry.CapabilityId, &telemetry.Configuration.Url); err != nil {
return nil, errors.New("scanning telemetry: " + err.Error())
}
telemetryMap[telemetry.CapabilityId] = append(telemetryMap[telemetry.CapabilityId], telemetry)
}
return telemetryMap, nil
}
func getTelemetryMetricsMap(tx *sql.Tx) (map[string][]Metric, error) {
tmRows, err := tx.Query(`SELECT name, time_granularity, data_percentile, latency, telemetry_id FROM cdni_telemetry_metrics`)
if err != nil {
return nil, errors.New("querying cdni telemetry metrics: " + err.Error())
}
defer log.Close(tmRows, "closing telemetry metrics query")
telemetryMetricMap := map[string][]Metric{}
for tmRows.Next() {
metric := Metric{}
if err := tmRows.Scan(&metric.Name, &metric.TimeGranularity, &metric.DataPercentile, &metric.Latency, &metric.TelemetryId); err != nil {
return nil, errors.New("scanning telemetry metric: " + err.Error())
}
telemetryMetricMap[metric.TelemetryId] = append(telemetryMetricMap[metric.TelemetryId], metric)
}
return telemetryMetricMap, nil
}
// Capabilities contains an array of CDNi capabilities.
type Capabilities struct {
Capabilities []Capability `json:"capabilities"`
}
// Capability contains information about a CDNi capability.
type Capability struct {
CapabilityType SupportedCapabilities `json:"capability-type"`
CapabilityValue interface{} `json:"capability-value"`
Footprints []Footprint `json:"footprints"`
}
// CapacityCapabilityValue contains the total and host capability limits.
type CapacityCapabilityValue struct {
Limits []Limit `json:"limits"`
}
// Limit contains the information for a capacity limit.
type Limit struct {
Id string `json:"id"`
Scope *LimitScope `json:"scope,omitempty"`
LimitType CapacityLimitType `json:"limit-type"`
MaximumHard int64 `json:"maximum-hard"`
MaximumSoft int64 `json:"maximum-soft"`
TelemetrySource TelemetrySource `json:"telemetry-source"`
}
// TelemetrySource contains the information for a telemetry source.
type TelemetrySource struct {
Id string `json:"id"`
Metric string `json:"metric"`
}
// TelemetryCapabilityValue contains an array of telemetry sources.
type TelemetryCapabilityValue struct {
Sources []Telemetry `json:"sources"`
}
// Telemetry contains the information for a telemetry metric.
type Telemetry struct {
Id string `json:"id"`
Type TelemetrySourceType `json:"type"`
CapabilityId int `json:"-"`
Metrics []Metric `json:"metrics"`
Configuration TelemetryConfiguration `json:"configuration"`
}
type TelemetryConfiguration struct {
Url string `json:"url"`
}
// Metric contains the metric information for a telemetry metric.
type Metric struct {
Name string `json:"name"`
TimeGranularity int `json:"time-granularity"`
DataPercentile int `json:"data-percentile"`
Latency int `json:"latency"`
TelemetryId string `json:"-"`
}
// Footprint contains the information for a footprint.
type Footprint struct {
FootprintType FootprintType `json:"footprint-type" db:"footprint_type"`
FootprintValue []string `json:"footprint-value" db:"footprint_value"`
CapabilityId int `json:"-"`
}
// CapacityLimitType is a string of the capacity limit type.
type CapacityLimitType string
const (
Egress CapacityLimitType = "egress"
Requests = "requests"
StorageSize = "storage-size"
StorageObjects = "storage-objects"
Sessions = "sessions"
CacheSize = "cache-size"
)
// SupportedCapabilities is a string of the supported capabilities.
type SupportedCapabilities string
const (
FciTelemetry SupportedCapabilities = "FCI.Telemetry"
FciCapacityLimits = "FCI.CapacityLimits"
)
// SupportedGenericMetadataType is a string of the supported metadata type.
type SupportedGenericMetadataType string
const (
MiRequestedCapacityLimits SupportedGenericMetadataType = "MI.RequestedCapacityLimits"
)
func (s SupportedGenericMetadataType) isValid() bool {
switch s {
case MiRequestedCapacityLimits:
return true
}
return false
}
// TelemetrySourceType is a string of the telemetry source type. Right now only "generic" is supported.
type TelemetrySourceType string
const (
Generic TelemetrySourceType = "generic"
)
// FootprintType is a string of the footprint type.
type FootprintType string
const (
Ipv4Cidr FootprintType = "ipv4cidr"
Ipv6Cidr = "ipv6cidr"
Asn = "asn"
CountryCode = "countrycode"
)
// GenericHostMetadata contains the generic CDNi metadata for a requested update to a specific host.
type GenericHostMetadata struct {
Host string `json:"host"`
HostMetadata HostMetadataList `json:"host-metadata"`
}
// GenericRequestMetadata contains the generic CDNi metadata for a requested update.
type GenericRequestMetadata struct {
Type string `json:"type"`
Metadata json.RawMessage `json:"metadata"`
Host string `json:"host,omitempty"`
}
// HostMetadataList contains CDNi metadata for a specific host.
type HostMetadataList struct {
Metadata json.RawMessage `json:"metadata"`
}
// GenericMetadata contains generic CDNi metadata.
type GenericMetadata struct {
Type SupportedGenericMetadataType `json:"generic-metadata-type"`
Value json.RawMessage `json:"generic-metadata-value"`
}
// CapacityRequestedLimits contains the requested capacity limits.
type CapacityRequestedLimits struct {
RequestedLimits []CapacityLimit `json:"requested-limits"`
}
// CapacityLimit contains the limit information for a given footprint.
type CapacityLimit struct {
LimitType string `json:"limit-type"`
LimitValue int64 `json:"limit-value"`
Footprints []Footprint `json:"footprints"`
}
// ConfigurationUpdateRequest contains information about a requested CDNi configuration update request.
type ConfigurationUpdateRequest struct {
ID int `json:"id"`
UCDN string `json:"ucdn"`
Data json.RawMessage `json:"data"`
Host string `json:"host"`
RequestType string `json:"requestType" db:"request_type"`
AsyncStatusID int `json:"asyncStatusId" db:"async_status_id"`
}