blob: 589d643f6fa60f384642eddb60e105551e8e4d8b [file] [log] [blame]
package server
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import (
"context"
"database/sql"
"fmt"
"net/http"
"sync"
"time"
"github.com/apache/trafficcontrol/lib/go-log"
"github.com/apache/trafficcontrol/lib/go-tc"
"github.com/apache/trafficcontrol/traffic_ops/traffic_ops_golang/api"
"github.com/lib/pq"
)
func GetServerUpdateStatusHandler(w http.ResponseWriter, r *http.Request) {
inf, userErr, sysErr, errCode := api.NewInfo(r, []string{"host_name"}, nil)
if userErr != nil || sysErr != nil {
api.HandleErr(w, r, inf.Tx.Tx, errCode, userErr, sysErr)
return
}
defer inf.Close()
serverUpdateStatuses, err, _ := getServerUpdateStatus(inf.Tx.Tx, inf.Params["host_name"])
if err != nil {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusInternalServerError, nil, err)
return
}
if inf.Version == nil || inf.Version.Major < 4 {
var downgradedStatuses []tc.ServerUpdateStatus
for _, status := range serverUpdateStatuses {
downgradedStatuses = append(downgradedStatuses, status.Downgrade())
}
api.WriteRespRaw(w, r, downgradedStatuses)
} else {
api.WriteResp(w, r, serverUpdateStatuses)
}
}
func getServerUpdateStatus(tx *sql.Tx, hostName string) ([]tc.ServerUpdateStatusV40, error, error) {
if serverUpdateStatusCacheIsInitialized() {
return getServerUpdateStatusFromCache(hostName), nil, nil
}
updateStatuses := []tc.ServerUpdateStatusV40{}
selectQuery := `
/* topology_ancestors finds the ancestor topology nodes of the topology node for
* the cachegroup containing server $5.
*/
WITH RECURSIVE topology_ancestors AS (
/* This is the base case of the recursive CTE, the topology node for the
* cachegroup containing server $5.
*/
SELECT tcp.child parent, NULL cachegroup, s.id base_server_id
FROM "server" s
JOIN cachegroup c ON s.cachegroup = c.id
JOIN topology_cachegroup tc ON c."name" = tc.cachegroup
JOIN topology_cachegroup_parents tcp ON tc.id = tcp.child
WHERE s.host_name = $5
UNION ALL
/* Find all direct topology parent nodes tc of a given topology ancestor ta. */
SELECT tcp.parent, tc.cachegroup, ta.base_server_id
FROM topology_ancestors ta, topology_cachegroup_parents tcp
JOIN topology_cachegroup tc ON tcp.parent = tc.id
JOIN cachegroup c ON tc.cachegroup = c."name"
JOIN "type" t ON c."type" = t.id
WHERE ta.parent = tcp.child
AND t."name" LIKE ANY($4::TEXT[])
/* server_topology_ancestors is the set of every server whose cachegroup is an
* ancestor topology node found by topology_ancestors.
*/
), server_topology_ancestors AS (
SELECT s.id,
s.cachegroup,
s.cdn_id,
s.config_update_time > s.config_apply_time AS upd_pending,
s.revalidate_update_time > s.revalidate_apply_time AS reval_pending,
s.status,
ta.base_server_id
FROM server s
JOIN cachegroup c ON s.cachegroup = c.id
JOIN topology_ancestors ta ON c."name" = ta.cachegroup
JOIN status ON status.id = s.status
WHERE status.name = ANY($1::TEXT[])
), parentservers AS (
SELECT ps.id,
ps.cachegroup,
ps.cdn_id,
ps.config_update_time > ps.config_apply_time AS upd_pending,
ps.revalidate_update_time > ps.revalidate_apply_time AS reval_pending,
ps.status
FROM server ps
LEFT JOIN status AS pstatus ON pstatus.id = ps.status
LEFT JOIN type t ON ps."type" = t.id
WHERE pstatus.name = ANY($1::TEXT[])
AND t."name" LIKE ANY($4::TEXT[])
), use_reval_pending AS (
SELECT value::BOOLEAN
FROM parameter
WHERE name = $2
AND config_file = $3
UNION ALL SELECT FALSE FETCH FIRST 1 ROW ONLY
)
SELECT
s.id,
s.host_name,
type.name AS type,
s.revalidate_update_time > s.revalidate_apply_time AS server_reval_pending,
use_reval_pending.value,
s.config_update_time > s.config_apply_time AS server_upd_pending,
status.name AS status,
/* True if the cachegroup parent or any ancestor topology node has pending updates. */
TRUE IN (
SELECT sta.upd_pending FROM server_topology_ancestors sta
WHERE sta.base_server_id = s.id
AND sta.cdn_id = s.cdn_id
UNION SELECT COALESCE(BOOL_OR(ps.upd_pending), FALSE)
) AS parent_upd_pending,
/* True if the cachegroup parent or any ancestor topology node has pending revalidation. */
TRUE IN (
SELECT sta.reval_pending FROM server_topology_ancestors sta
WHERE sta.base_server_id = s.id
AND sta.cdn_id = s.cdn_id
UNION SELECT COALESCE(BOOL_OR(ps.reval_pending), FALSE)
) AS parent_reval_pending,
s.config_update_time,
s.config_apply_time,
s.revalidate_update_time,
s.revalidate_apply_time
FROM use_reval_pending,
server s
LEFT JOIN status ON s.status = status.id
LEFT JOIN cachegroup cg ON s.cachegroup = cg.id
LEFT JOIN type ON type.id = s.type
LEFT JOIN parentservers ps ON ps.cachegroup = cg.parent_cachegroup_id
AND ps.cdn_id = s.cdn_id
WHERE s.host_name = $5
GROUP BY s.id, s.host_name, type.name, server_reval_pending, use_reval_pending.value, server_upd_pending, status.name, config_update_time, config_apply_time, revalidate_update_time, revalidate_apply_time
ORDER BY s.id
`
cacheStatusesToCheck := []tc.CacheStatus{tc.CacheStatusOnline, tc.CacheStatusReported, tc.CacheStatusAdminDown}
cacheGroupTypes := []string{tc.EdgeTypePrefix + "%", tc.MidTypePrefix + "%"}
rows, err := tx.Query(selectQuery, pq.Array(cacheStatusesToCheck), tc.UseRevalPendingParameterName, tc.GlobalConfigFileName, pq.Array(cacheGroupTypes), hostName)
if err != nil {
log.Errorf("could not execute query: %s\n", err)
return nil, nil, fmt.Errorf("could not execute query: %w", err)
}
defer log.Close(rows, "getServerUpdateStatus(): unable to close db connection")
for rows.Next() {
var us tc.ServerUpdateStatusV40
var serverType string
if err := rows.Scan(&us.HostId, &us.HostName, &serverType, &us.RevalPending, &us.UseRevalPending, &us.UpdatePending, &us.Status, &us.ParentPending, &us.ParentRevalPending, &us.ConfigUpdateTime, &us.ConfigApplyTime, &us.RevalidateUpdateTime, &us.RevalidateApplyTime); err != nil {
return nil, nil, fmt.Errorf("could not scan server update status: %w", err)
}
updateStatuses = append(updateStatuses, us)
}
return updateStatuses, nil, nil
}
type serverUpdateStatuses struct {
serverMap map[string][]tc.ServerUpdateStatusV40
*sync.RWMutex
initialized bool
enabled bool // note: enabled is only written to once at startup, before serving requests, so it doesn't need synchronized access
}
var serverUpdateStatusCache = serverUpdateStatuses{RWMutex: &sync.RWMutex{}}
func serverUpdateStatusCacheIsInitialized() bool {
if serverUpdateStatusCache.enabled {
serverUpdateStatusCache.RLock()
defer serverUpdateStatusCache.RUnlock()
return serverUpdateStatusCache.initialized
}
return false
}
func getServerUpdateStatusFromCache(hostname string) []tc.ServerUpdateStatusV40 {
serverUpdateStatusCache.RLock()
defer serverUpdateStatusCache.RUnlock()
return serverUpdateStatusCache.serverMap[hostname]
}
var once = sync.Once{}
func InitServerUpdateStatusCache(interval time.Duration, db *sql.DB, timeout time.Duration) {
once.Do(func() {
if interval <= 0 {
return
}
serverUpdateStatusCache.enabled = true
refreshServerUpdateStatusCache(db, timeout)
startServerUpdateStatusCacheRefresher(interval, db, timeout)
})
}
func startServerUpdateStatusCacheRefresher(interval time.Duration, db *sql.DB, timeout time.Duration) {
go func() {
for {
time.Sleep(interval)
refreshServerUpdateStatusCache(db, timeout)
}
}()
}
func refreshServerUpdateStatusCache(db *sql.DB, timeout time.Duration) {
newServerUpdateStatuses, err := getServerUpdateStatuses(db, timeout)
if err != nil {
log.Errorf("refreshing server update status cache: %s", err.Error())
return
}
serverUpdateStatusCache.Lock()
defer serverUpdateStatusCache.Unlock()
serverUpdateStatusCache.serverMap = newServerUpdateStatuses
serverUpdateStatusCache.initialized = true
log.Infof("refreshed server update status cache (len = %d)", len(serverUpdateStatusCache.serverMap))
}
type serverInfo struct {
id int
hostName string
typeName string
cdnId int
status string
cachegroup int
configUpdateTime *time.Time
configApplyTime *time.Time
revalUpdateTime *time.Time
revalApplyTime *time.Time
}
const getUseRevalPendingQuery = `
SELECT value::BOOLEAN
FROM parameter
WHERE name = 'use_reval_pending' AND config_file = 'global'
UNION ALL SELECT FALSE FETCH FIRST 1 ROW ONLY
`
const getServerInfoQuery = `
SELECT
s.id,
s.host_name,
t.name,
s.cdn_id,
st.name,
s.cachegroup,
s.config_update_time,
s.config_apply_time,
s.revalidate_update_time,
s.revalidate_apply_time
FROM server s
JOIN type t ON t.id = s.type
JOIN status st ON st.id = s.status
`
const getCacheGroupsQuery = `
SELECT
c.id,
c.parent_cachegroup_id,
c.secondary_parent_cachegroup_id
FROM cachegroup c
`
const getTopologyCacheGroupParentsQuery = `
SELECT
cg_child.id,
ARRAY_AGG(DISTINCT cg_parent.id)
FROM topology_cachegroup_parents tcp
JOIN topology_cachegroup tc_child ON tc_child.id = tcp.child
JOIN cachegroup cg_child ON cg_child.name = tc_child.cachegroup
JOIN topology_cachegroup tc_parent ON tc_parent.id = tcp.parent
JOIN cachegroup cg_parent ON cg_parent.name = tc_parent.cachegroup
GROUP BY cg_child.id
`
func getServerUpdateStatuses(db *sql.DB, timeout time.Duration) (map[string][]tc.ServerUpdateStatusV40, error) {
dbCtx, dbClose := context.WithTimeout(context.Background(), timeout)
defer dbClose()
serversByID := make(map[int]serverInfo)
updatePendingByCDNCachegroup := make(map[int]map[int]bool)
revalPendingByCDNCachegroup := make(map[int]map[int]bool)
tx, err := db.BeginTx(dbCtx, nil)
if err != nil {
return nil, fmt.Errorf("beginning server update status transaction: %w", err)
}
defer func() {
if err := tx.Commit(); err != nil && err != sql.ErrTxDone {
log.Errorln("committing server update status transaction: " + err.Error())
}
}()
useRevalPending := false
if err := tx.QueryRowContext(dbCtx, getUseRevalPendingQuery).Scan(&useRevalPending); err != nil {
return nil, fmt.Errorf("querying use_reval_pending param: %w", err)
}
// get all servers and build map of update/revalPending by cachegroup+CDN
serverRows, err := tx.QueryContext(dbCtx, getServerInfoQuery)
if err != nil {
return nil, fmt.Errorf("querying servers: %w", err)
}
defer log.Close(serverRows, "closing server rows")
for serverRows.Next() {
s := serverInfo{}
if err := serverRows.Scan(&s.id, &s.hostName, &s.typeName, &s.cdnId, &s.status, &s.cachegroup, &s.configUpdateTime, &s.configApplyTime, &s.revalUpdateTime, &s.revalApplyTime); err != nil {
return nil, fmt.Errorf("scanning servers: %w", err)
}
serversByID[s.id] = s
if _, ok := updatePendingByCDNCachegroup[s.cdnId]; !ok {
updatePendingByCDNCachegroup[s.cdnId] = make(map[int]bool)
}
if _, ok := revalPendingByCDNCachegroup[s.cdnId]; !ok {
revalPendingByCDNCachegroup[s.cdnId] = make(map[int]bool)
}
status := tc.CacheStatusFromString(s.status)
if tc.IsValidCacheType(s.typeName) && (status == tc.CacheStatusOnline || status == tc.CacheStatusReported || status == tc.CacheStatusAdminDown) {
if s.configUpdateTime.After(*s.configApplyTime) {
updatePendingByCDNCachegroup[s.cdnId][s.cachegroup] = true
}
if s.revalUpdateTime.After(*s.revalApplyTime) {
revalPendingByCDNCachegroup[s.cdnId][s.cachegroup] = true
}
}
}
if err := serverRows.Err(); err != nil {
return nil, fmt.Errorf("iterating over server rows: %w", err)
}
// get all legacy cachegroup parents
cacheGroupParents := make(map[int]map[int]struct{})
cacheGroupRows, err := tx.QueryContext(dbCtx, getCacheGroupsQuery)
if err != nil {
return nil, fmt.Errorf("querying cachegroups: %w", err)
}
defer log.Close(cacheGroupRows, "closing cachegroup rows")
for cacheGroupRows.Next() {
id := 0
parentID := new(int)
secondaryParentID := new(int)
if err := cacheGroupRows.Scan(&id, &parentID, &secondaryParentID); err != nil {
return nil, fmt.Errorf("scanning cachegroups: %w", err)
}
cacheGroupParents[id] = make(map[int]struct{})
if parentID != nil {
cacheGroupParents[id][*parentID] = struct{}{}
}
if secondaryParentID != nil {
cacheGroupParents[id][*secondaryParentID] = struct{}{}
}
}
if err := cacheGroupRows.Err(); err != nil {
return nil, fmt.Errorf("iterating over cachegroup rows: %w", err)
}
// get all topology-based cachegroup parents
topologyCachegroupRows, err := tx.QueryContext(dbCtx, getTopologyCacheGroupParentsQuery)
if err != nil {
return nil, fmt.Errorf("querying topology cachegroups: %w", err)
}
defer log.Close(topologyCachegroupRows, "closing topology cachegroup rows")
for topologyCachegroupRows.Next() {
id := 0
parents := []int32{}
if err := topologyCachegroupRows.Scan(&id, pq.Array(&parents)); err != nil {
return nil, fmt.Errorf("scanning topology cachegroup rows: %w", err)
}
for _, p := range parents {
cacheGroupParents[id][int(p)] = struct{}{}
}
}
if err = topologyCachegroupRows.Err(); err != nil {
return nil, fmt.Errorf("iterating over topology cachegroup rows: %w", err)
}
serverUpdateStatuses := make(map[string][]tc.ServerUpdateStatusV40, len(serversByID))
for serverID, server := range serversByID {
updateStatus := tc.ServerUpdateStatusV40{
HostName: server.hostName,
UpdatePending: server.configUpdateTime.After(*server.configApplyTime),
RevalPending: server.revalUpdateTime.After(*server.revalApplyTime),
UseRevalPending: useRevalPending,
HostId: serverID,
Status: server.status,
ParentPending: getParentPending(cacheGroupParents[server.cachegroup], updatePendingByCDNCachegroup[server.cdnId]),
ParentRevalPending: getParentPending(cacheGroupParents[server.cachegroup], revalPendingByCDNCachegroup[server.cdnId]),
ConfigUpdateTime: server.configUpdateTime,
ConfigApplyTime: server.configApplyTime,
RevalidateUpdateTime: server.revalUpdateTime,
RevalidateApplyTime: server.revalApplyTime,
}
serverUpdateStatuses[server.hostName] = append(serverUpdateStatuses[server.hostName], updateStatus)
}
return serverUpdateStatuses, nil
}
func getParentPending(parents map[int]struct{}, pendingByCacheGroup map[int]bool) bool {
for parent := range parents {
if pendingByCacheGroup[parent] {
return true
}
}
return false
}