blob: e0eeb2b739071a3c5969baa1134f03fa748d35d5 [file] [log] [blame]
package cachegroup
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import (
"database/sql"
"encoding/json"
"errors"
"fmt"
"net/http"
"strconv"
"github.com/apache/trafficcontrol/lib/go-tc"
"github.com/apache/trafficcontrol/lib/go-util"
"github.com/apache/trafficcontrol/traffic_ops/traffic_ops_golang/api"
"github.com/apache/trafficcontrol/traffic_ops/traffic_ops_golang/auth"
"github.com/apache/trafficcontrol/traffic_ops/traffic_ops_golang/dbhelpers"
"github.com/apache/trafficcontrol/traffic_ops/traffic_ops_golang/tenant"
"github.com/lib/pq"
)
func DSPostHandlerV31(w http.ResponseWriter, r *http.Request) {
inf, userErr, sysErr, errCode := api.NewInfo(r, []string{"id"}, []string{"id"})
if userErr != nil || sysErr != nil {
api.HandleErr(w, r, inf.Tx.Tx, errCode, userErr, sysErr)
return
}
defer inf.Close()
req := tc.CachegroupPostDSReq{}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusBadRequest, errors.New("malformed JSON: "+err.Error()), nil)
return
}
resp, vals, userErr, sysErr, errCode := postDSes(inf.Tx.Tx, inf.User, inf.IntParams["id"], req.DeliveryServices)
if userErr != nil || sysErr != nil {
api.HandleErr(w, r, inf.Tx.Tx, errCode, userErr, sysErr)
return
}
if err := updateDSParam(inf.Tx.Tx, req.DeliveryServices, "cacheurl_", "cacheurl"); err != nil {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusInternalServerError, nil, errors.New("updating delivery service parameters: "+err.Error()))
return
}
if err, errCode := writeChangeLog(inf.Tx.Tx, inf.User, inf.IntParams["id"]); err != nil {
api.HandleErr(w, r, inf.Tx.Tx, errCode, nil, err)
return
}
api.WriteRespVals(w, r, resp, vals)
}
func DSPostHandlerV40(w http.ResponseWriter, r *http.Request) {
inf, userErr, sysErr, errCode := api.NewInfo(r, []string{"id"}, []string{"id"})
if userErr != nil || sysErr != nil {
api.HandleErr(w, r, inf.Tx.Tx, errCode, userErr, sysErr)
return
}
defer inf.Close()
req := tc.CachegroupPostDSReq{}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusBadRequest, errors.New("malformed JSON: "+err.Error()), nil)
return
}
cdnNames, err := dbhelpers.GetCDNNamesFromDSIds(inf.Tx.Tx, req.DeliveryServices)
if err != nil {
api.HandleErr(w, r, inf.Tx.Tx, http.StatusInternalServerError, nil, errors.New("getting CDN names from DS IDs "+err.Error()))
return
}
userErr, sysErr, statusCode := dbhelpers.CheckIfCurrentUserCanModifyCDNs(inf.Tx.Tx, cdnNames, inf.User.UserName)
if userErr != nil || sysErr != nil {
api.HandleErr(w, r, inf.Tx.Tx, statusCode, userErr, sysErr)
return
}
resp, vals, userErr, sysErr, errCode := postDSes(inf.Tx.Tx, inf.User, inf.IntParams["id"], req.DeliveryServices)
if userErr != nil || sysErr != nil {
api.HandleErr(w, r, inf.Tx.Tx, errCode, userErr, sysErr)
return
}
if err, errCode := writeChangeLog(inf.Tx.Tx, inf.User, inf.IntParams["id"]); err != nil {
api.HandleErr(w, r, inf.Tx.Tx, errCode, nil, err)
return
}
api.WriteRespVals(w, r, resp, vals)
}
func writeChangeLog(tx *sql.Tx, user *auth.CurrentUser, cgID int) (error, int) {
cgName, ok, err := dbhelpers.GetCacheGroupNameFromID(tx, cgID)
if err != nil {
return fmt.Errorf("getting cachegroup name from ID %d: %s", cgID, err.Error()), http.StatusInternalServerError
} else if !ok {
return fmt.Errorf("cachegroup %d does not exist", cgID), http.StatusNotFound
}
api.CreateChangeLogRawTx(api.ApiChange, "CACHEGROUP: "+string(cgName)+", ID: "+strconv.Itoa(cgID)+", ACTION: Assign DSes to CacheGroup servers", user, tx)
return nil, 0
}
// postDSes returns the post response, any user error, any system error, and the HTTP status code to be returned in the event of an error.
func postDSes(tx *sql.Tx, user *auth.CurrentUser, cgID int, dsIDs []int) (tc.CacheGroupPostDSResp, map[string]interface{}, error, error, int) {
cdnName, usrErr, sysErr, errCode := getCachegroupCDN(tx, cgID)
if sysErr != nil {
sysErr = errors.New("getting cachegroup CDN: " + sysErr.Error())
}
if usrErr != nil || sysErr != nil {
return tc.CacheGroupPostDSResp{}, nil, usrErr, sysErr, errCode
}
tenantIDs, err := getDSTenants(tx, dsIDs)
if err != nil {
return tc.CacheGroupPostDSResp{}, nil, nil, errors.New("getting delivery service tenant IDs: " + err.Error()), http.StatusInternalServerError
}
for _, tenantID := range tenantIDs {
ok, err := tenant.IsResourceAuthorizedToUserTx(int(tenantID), user, tx)
if err != nil {
return tc.CacheGroupPostDSResp{}, nil, nil, errors.New("checking tenancy: " + err.Error()), http.StatusInternalServerError
}
if !ok {
return tc.CacheGroupPostDSResp{}, nil, fmt.Errorf("not authorized for delivery service tenant %d", tenantID), nil, http.StatusForbidden
}
}
topologyDSes, err := dbhelpers.GetDeliveryServicesWithTopologies(tx, dsIDs)
if err != nil {
return tc.CacheGroupPostDSResp{}, nil, nil, errors.New("getting delivery services with topologies: " + err.Error()), http.StatusInternalServerError
}
if len(topologyDSes) > 0 {
return tc.CacheGroupPostDSResp{}, nil, fmt.Errorf("delivery services %v are already assigned to a topology", topologyDSes), nil, http.StatusBadRequest
}
if err := verifyDSesCDN(tx, dsIDs, cdnName); err != nil {
return tc.CacheGroupPostDSResp{}, nil, nil, errors.New("verifying delivery service CDNs match cachegroup server CDNs: " + err.Error()), http.StatusInternalServerError
}
cgServers, err := getCachegroupServers(tx, cgID)
if err != nil {
return tc.CacheGroupPostDSResp{}, nil, nil, errors.New("getting cachegroup server names " + err.Error()), http.StatusInternalServerError
}
if err := insertCachegroupDSes(tx, cgID, dsIDs); err != nil {
return tc.CacheGroupPostDSResp{}, nil, nil, errors.New("inserting cachegroup delivery services: " + err.Error()), http.StatusInternalServerError
}
if err := updateParams(tx, dsIDs); err != nil {
return tc.CacheGroupPostDSResp{}, nil, nil, errors.New("updating delivery service parameters: " + err.Error()), http.StatusInternalServerError
}
vals := map[string]interface{}{
"alerts": tc.CreateAlerts(tc.SuccessLevel, "Delivery services successfully assigned to all the servers of cache group "+strconv.Itoa(cgID)+".").Alerts,
}
return tc.CacheGroupPostDSResp{ID: util.JSONIntStr(cgID), ServerNames: cgServers, DeliveryServices: dsIDs}, vals, nil, nil, http.StatusOK
}
func insertCachegroupDSes(tx *sql.Tx, cgID int, dsIDs []int) error {
_, err := tx.Exec(`
INSERT INTO deliveryservice_server (deliveryservice, server) (
SELECT unnest($1::int[]), server.id
FROM server
JOIN type on type.id = server.type
WHERE server.cachegroup = $2
AND (type.name LIKE 'EDGE%' OR type.name LIKE 'ORG%')
) ON CONFLICT DO NOTHING
`, pq.Array(dsIDs), cgID)
if err != nil {
return errors.New("inserting cachegroup servers: " + err.Error())
}
return nil
}
func getCachegroupServers(tx *sql.Tx, cgID int) ([]tc.CacheName, error) {
q := `
SELECT server.host_name FROM server
JOIN type on type.id = server.type
WHERE server.cachegroup = $1
AND (type.name LIKE 'EDGE%' OR type.name LIKE 'ORG%')
`
rows, err := tx.Query(q, cgID)
if err != nil {
return nil, errors.New("selecting cachegroup servers: " + err.Error())
}
defer rows.Close()
names := []tc.CacheName{}
for rows.Next() {
name := ""
if err := rows.Scan(&name); err != nil {
return nil, errors.New("querying cachegroup server names: " + err.Error())
}
names = append(names, tc.CacheName(name))
}
return names, nil
}
func verifyDSesCDN(tx *sql.Tx, dsIDs []int, cdn string) error {
q := `
SELECT count(cdn.name)
FROM cdn
JOIN deliveryservice as ds on ds.cdn_id = cdn.id
WHERE ds.id = ANY($1::bigint[])
AND cdn.name <> $2::text
`
count := 0
if err := tx.QueryRow(q, pq.Array(dsIDs), cdn).Scan(&count); err != nil {
return errors.New("querying cachegroup CDNs: " + err.Error())
}
if count > 0 {
return errors.New("servers/deliveryservices do not belong to same cdn '" + cdn + "'")
}
return nil
}
func getCachegroupCDN(tx *sql.Tx, cgID int) (string, error, error, int) {
q := `
SELECT cdn.name
FROM cdn
JOIN server on server.cdn_id = cdn.id
JOIN type on server.type = type.id
WHERE server.cachegroup = $1
AND (type.name LIKE 'EDGE%' OR type.name LIKE 'ORG%')
`
rows, err := tx.Query(q, cgID)
if err != nil {
return "", nil, errors.New("selecting cachegroup CDNs: " + err.Error()), http.StatusInternalServerError
}
defer rows.Close()
cdn := ""
for rows.Next() {
serverCDN := ""
if err := rows.Scan(&serverCDN); err != nil {
return "", nil, errors.New("scanning cachegroup CDN: " + err.Error()), http.StatusInternalServerError
}
if cdn == "" {
cdn = serverCDN
}
if cdn != serverCDN {
return "", nil, errors.New("cachegroup servers have different CDNs '" + cdn + "' and '" + serverCDN + "'"), http.StatusInternalServerError
}
}
if cdn == "" {
return "", fmt.Errorf("no edge or origin servers found on cachegroup %d", cgID), nil, http.StatusBadRequest
}
return cdn, nil, nil, http.StatusOK
}
// updateParams updated the header rewrite, and regex remap params for the given edge caches, on the given delivery services. NOTE it does not update Mid params.
func updateParams(tx *sql.Tx, dsIDs []int) error {
if err := updateDSParam(tx, dsIDs, "hdr_rw_", "edge_header_rewrite"); err != nil {
return err
}
if err := updateDSParam(tx, dsIDs, "regex_remap_", "regex_remap"); err != nil {
return err
}
return nil
}
func updateDSParam(tx *sql.Tx, dsIDs []int, paramPrefix string, dsField string) error {
_, err := tx.Exec(`
DELETE FROM parameter
WHERE name = 'location'
AND config_file IN (
SELECT CONCAT('`+paramPrefix+`', xml_id, '.config')
FROM deliveryservice as ds
WHERE ds.id = ANY($1)
AND (ds.`+dsField+` IS NULL OR ds.`+dsField+` = '')
)
`, pq.Array(dsIDs))
if err != nil {
return err
}
rows, err := tx.Query(`
WITH ats_config_location AS (
SELECT TRIM(TRAILING '/' FROM value) as v FROM parameter WHERE name = 'location' AND config_file = 'remap.config'
)
INSERT INTO parameter (name, config_file, value) (
SELECT
'location' as name,
CONCAT('`+paramPrefix+`', xml_id, '.config'),
(select v from ats_config_location)
FROM deliveryservice WHERE id = ANY($1)
) ON CONFLICT (name, config_file, value) DO UPDATE SET name = EXCLUDED.name RETURNING id
`, pq.Array(dsIDs))
if err != nil {
return errors.New("inserting parameters: " + err.Error())
}
ids := []int{}
for rows.Next() {
id := 0
if err := rows.Scan(&id); err != nil {
return errors.New("scanning inserted parameters: " + err.Error())
}
ids = append(ids, id)
}
_, err = tx.Exec(`
INSERT INTO profile_parameter (parameter, profile) (
SELECT UNNEST($1::int[]), server.profile
FROM server
JOIN deliveryservice_server as dss ON dss.server = server.id
JOIN deliveryservice as ds ON ds.id = dss.deliveryservice
WHERE ds.id = ANY($2)
) ON CONFLICT DO NOTHING
`, pq.Array(ids), pq.Array(dsIDs))
if err != nil {
return errors.New("inserting profile parameters: " + err.Error())
}
return nil
}
func getDSTenants(tx *sql.Tx, dsIDs []int) ([]int, error) {
q := `
SELECT tenant_id FROM deliveryservice
WHERE deliveryservice.id = ANY($1)
`
rows, err := tx.Query(q, pq.Array(dsIDs))
if err != nil {
return nil, errors.New("selecting delivery service tenants: " + err.Error())
}
defer rows.Close()
tenantIDs := []int{}
for rows.Next() {
id := 0
if err := rows.Scan(&id); err != nil {
return nil, errors.New("querying cachegroup delivery service tenants: " + err.Error())
}
tenantIDs = append(tenantIDs, id)
}
return tenantIDs, nil
}