blob: b6b75b3dd89d6d45f030d4dee6d4c820c6c98235 [file] [log] [blame]
package deliveryservice
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import (
"database/sql"
"errors"
"fmt"
"net/http"
"strconv"
"strings"
"time"
"github.com/apache/trafficcontrol/lib/go-log"
"github.com/apache/trafficcontrol/lib/go-tc"
"github.com/apache/trafficcontrol/lib/go-tc/tovalidate"
"github.com/apache/trafficcontrol/lib/go-util"
"github.com/apache/trafficcontrol/traffic_ops/traffic_ops_golang/api"
"github.com/apache/trafficcontrol/traffic_ops/traffic_ops_golang/dbhelpers"
"github.com/apache/trafficcontrol/traffic_ops/traffic_ops_golang/tenant"
"github.com/apache/trafficcontrol/traffic_ops/traffic_ops_golang/util/ims"
validation "github.com/go-ozzo/ozzo-validation"
"github.com/lib/pq"
)
const (
deliveryServiceQueryParam = "deliveryServiceID"
requiredCapabilityQueryParam = "requiredCapability"
xmlIDQueryParam = "xmlID"
)
// RequiredCapability provides a type to define methods on.
type RequiredCapability struct {
api.APIInfoImpl `json:"-"`
tc.DeliveryServicesRequiredCapability
}
// SetLastUpdated implements the api.GenericCreator interfaces and
// sets the timestamp on insert.
func (rc *RequiredCapability) SetLastUpdated(t tc.TimeNoMod) { rc.LastUpdated = &t }
// NewReadObj implements the api.GenericReader interfaces.
func (rc *RequiredCapability) NewReadObj() interface{} {
return &tc.DeliveryServicesRequiredCapability{}
}
// SelectQuery implements the api.GenericReader interface.
func (rc *RequiredCapability) SelectQuery() string {
return `SELECT
rc.required_capability,
rc.deliveryservice_id,
ds.xml_id,
rc.last_updated
FROM deliveryservices_required_capability rc
JOIN deliveryservice ds ON ds.id = rc.deliveryservice_id`
}
// ParamColumns implements the api.GenericReader interface.
func (rc *RequiredCapability) ParamColumns() map[string]dbhelpers.WhereColumnInfo {
return map[string]dbhelpers.WhereColumnInfo{
deliveryServiceQueryParam: dbhelpers.WhereColumnInfo{
Column: "rc.deliveryservice_id",
Checker: api.IsInt,
},
xmlIDQueryParam: dbhelpers.WhereColumnInfo{
Column: "ds.xml_id",
Checker: nil,
},
requiredCapabilityQueryParam: dbhelpers.WhereColumnInfo{
Column: "rc.required_capability",
Checker: nil,
},
}
}
// DeleteQuery implements the api.GenericDeleter interface.
func (rc *RequiredCapability) DeleteQuery() string {
return `DELETE FROM deliveryservices_required_capability
WHERE deliveryservice_id = :deliveryservice_id AND required_capability = :required_capability`
}
// GetKeyFieldsInfo implements the api.Identifier interface.
func (rc RequiredCapability) GetKeyFieldsInfo() []api.KeyFieldInfo {
return []api.KeyFieldInfo{
{
Field: deliveryServiceQueryParam,
Func: api.GetIntKey,
},
{
Field: requiredCapabilityQueryParam,
Func: api.GetStringKey,
},
}
}
// GetKeys implements the api.Identifier interface and is not needed
// because Update is not available.
func (rc RequiredCapability) GetKeys() (map[string]interface{}, bool) {
if rc.DeliveryServiceID == nil {
return map[string]interface{}{deliveryServiceQueryParam: 0}, false
}
if rc.RequiredCapability == nil {
return map[string]interface{}{requiredCapabilityQueryParam: 0}, false
}
return map[string]interface{}{
deliveryServiceQueryParam: *rc.DeliveryServiceID,
requiredCapabilityQueryParam: *rc.RequiredCapability,
}, true
}
// SetKeys implements the api.Identifier interface and allows the
// create handler to assign deliveryServiceID and requiredCapability.
func (rc *RequiredCapability) SetKeys(keys map[string]interface{}) {
// this utilizes the non panicking type assertion, if the thrown
// away ok variable is false it will be the zero of the type.
id, _ := keys[deliveryServiceQueryParam].(int)
rc.DeliveryServiceID = &id
capability, _ := keys[requiredCapabilityQueryParam].(string)
rc.RequiredCapability = &capability
}
// GetAuditName implements the api.Identifier interface and
// returns the name of the object.
func (rc *RequiredCapability) GetAuditName() string {
if rc.RequiredCapability != nil {
return *rc.RequiredCapability
}
return "unknown"
}
// GetType implements the api.Identifier interface and
// returns the name of the struct.
func (rc *RequiredCapability) GetType() string {
return "deliveryservice.RequiredCapability"
}
// Validate implements the api.Validator interface.
func (rc RequiredCapability) Validate() (error, error) {
errs := validation.Errors{
deliveryServiceQueryParam: validation.Validate(rc.DeliveryServiceID, validation.Required),
requiredCapabilityQueryParam: validation.Validate(rc.RequiredCapability, validation.Required),
}
return util.JoinErrs(tovalidate.ToErrors(errs)), nil
}
// Update implements the api.CRUDer interface.
func (rc *RequiredCapability) Update(http.Header) (error, error, int) {
return nil, nil, http.StatusNotImplemented
}
// Read implements the api.CRUDer interface.
func (rc *RequiredCapability) Read(h http.Header, useIMS bool) ([]interface{}, error, error, int, *time.Time) {
tenantIDs, err := rc.getTenantIDs()
if err != nil {
return nil, nil, err, http.StatusInternalServerError, nil
}
capabilities, userErr, sysErr, errCode, maxTime := rc.getCapabilities(h, tenantIDs, useIMS)
if userErr != nil || sysErr != nil {
return nil, userErr, sysErr, errCode, nil
}
results := []interface{}{}
for _, capability := range capabilities {
results = append(results, capability)
}
return results, nil, nil, errCode, maxTime
}
func (rc *RequiredCapability) getTenantIDs() ([]int, error) {
tenantIDs, err := tenant.GetUserTenantIDListTx(rc.APIInfo().Tx.Tx, rc.APIInfo().User.TenantID)
if err != nil {
return nil, err
}
return tenantIDs, nil
}
func (rc *RequiredCapability) getCapabilities(h http.Header, tenantIDs []int, useIMS bool) ([]tc.DeliveryServicesRequiredCapability, error, error, int, *time.Time) {
var maxTime time.Time
var runSecond bool
var results []tc.DeliveryServicesRequiredCapability
where, orderBy, pagination, queryValues, errs := dbhelpers.BuildWhereAndOrderByAndPagination(rc.APIInfo().Params, rc.ParamColumns())
if len(errs) > 0 {
return nil, util.JoinErrs(errs), nil, http.StatusBadRequest, nil
}
where, queryValues = dbhelpers.AddTenancyCheck(where, queryValues, "ds.tenant_id", tenantIDs)
if useIMS {
runSecond, maxTime = ims.TryIfModifiedSinceQuery(rc.APIInfo().Tx, h, queryValues, selectMaxLastUpdatedQueryRC(where, orderBy, pagination))
if !runSecond {
log.Debugln("IMS HIT")
return results, nil, nil, http.StatusNotModified, &maxTime
}
log.Debugln("IMS MISS")
} else {
log.Debugln("Non IMS request")
}
query := rc.SelectQuery() + where + orderBy + pagination
rows, err := rc.APIInfo().Tx.NamedQuery(query, queryValues)
if err != nil {
return nil, nil, err, http.StatusInternalServerError, nil
}
defer rows.Close()
for rows.Next() {
var result tc.DeliveryServicesRequiredCapability
if err := rows.StructScan(&result); err != nil {
return nil, nil, fmt.Errorf("%s get scanning: %s", rc.GetType(), err.Error()), http.StatusInternalServerError, nil
}
results = append(results, result)
}
return results, nil, nil, http.StatusOK, &maxTime
}
func selectMaxLastUpdatedQueryRC(where string, orderBy string, pagination string) string {
return `SELECT max(t) from (
SELECT max(rc.last_updated) as t FROM deliveryservices_required_capability rc
JOIN deliveryservice ds ON ds.id = rc.deliveryservice_id ` + where + orderBy + pagination +
` UNION ALL
select max(last_updated) as t from last_deleted l where l.table_name='deliveryservices_required_capability') as res`
}
// Delete implements the api.CRUDer interface.
func (rc *RequiredCapability) Delete() (error, error, int) {
authorized, err := rc.isTenantAuthorized()
if !authorized {
return errors.New("not authorized on this tenant"), nil, http.StatusForbidden
} else if err != nil {
return nil, fmt.Errorf("checking authorization for existing DS ID: %s" + err.Error()), http.StatusInternalServerError
}
_, cdnName, _, err := dbhelpers.GetDSNameAndCDNFromID(rc.ReqInfo.Tx.Tx, *rc.DeliveryServiceID)
if err != nil {
return nil, err, http.StatusInternalServerError
}
userErr, sysErr, errCode := dbhelpers.CheckIfCurrentUserCanModifyCDN(rc.ReqInfo.Tx.Tx, string(cdnName), rc.ReqInfo.User.UserName)
if userErr != nil || sysErr != nil {
return userErr, sysErr, errCode
}
return api.GenericDelete(rc)
}
// Create implements the api.CRUDer interface.
func (rc *RequiredCapability) Create() (error, error, int) {
authorized, err := rc.isTenantAuthorized()
if !authorized {
return errors.New("not authorized on this tenant"), nil, http.StatusForbidden
} else if err != nil {
return nil, fmt.Errorf("checking authorization for existing DS ID: %s" + err.Error()), http.StatusInternalServerError
}
_, cdnName, _, err := dbhelpers.GetDSNameAndCDNFromID(rc.ReqInfo.Tx.Tx, *rc.DeliveryServiceID)
if err != nil {
return nil, err, http.StatusInternalServerError
}
userErr, sysErr, errCode := dbhelpers.CheckIfCurrentUserCanModifyCDN(rc.ReqInfo.Tx.Tx, string(cdnName), rc.ReqInfo.User.UserName)
if userErr != nil || sysErr != nil {
return userErr, sysErr, errCode
}
// Ensure DS type is only of HTTP*, DNS* types
dsType, reqCaps, topology, dsExists, err := dbhelpers.GetDeliveryServiceTypeRequiredCapabilitiesAndTopology(*rc.DeliveryServiceID, rc.APIInfo().Tx.Tx)
if err != nil {
return nil, err, http.StatusInternalServerError
}
if !dsExists {
return errors.New("a deliveryservice with id '" + strconv.Itoa(*rc.DeliveryServiceID) + "' was not found"), nil, http.StatusNotFound
}
if !dsType.IsHTTP() && !dsType.IsDNS() {
return errors.New("Invalid DS type. Only DNS and HTTP delivery services can have required capabilities"), nil, http.StatusBadRequest
}
usrErr, sysErr, rCode := rc.checkServerCap()
if usrErr != nil || sysErr != nil {
return usrErr, sysErr, rCode
}
if topology == nil {
usrErr, sysErr, rCode = rc.ensureDSServerCap()
if usrErr != nil || sysErr != nil {
return usrErr, sysErr, rCode
}
} else {
newReqCaps := append(reqCaps, *rc.RequiredCapability)
usrErr, sysErr, rCode = EnsureTopologyBasedRequiredCapabilities(rc.APIInfo().Tx.Tx, *rc.DeliveryServiceID, *topology, newReqCaps)
if usrErr != nil {
return fmt.Errorf("cannot add required capability: %v", usrErr), sysErr, rCode
}
if sysErr != nil {
return usrErr, sysErr, rCode
}
}
rows, err := rc.APIInfo().Tx.NamedQuery(rcInsertQuery(), rc)
if err != nil {
return api.ParseDBError(err)
}
defer log.Close(rows, "closing rows in RequiredCapability.Create()")
rowsAffected := 0
for rows.Next() {
rowsAffected++
if err := rows.StructScan(&rc); err != nil {
return nil, fmt.Errorf("%s create scanning: %s", rc.GetType(), err.Error()), http.StatusInternalServerError
}
}
if rowsAffected == 0 {
return nil, fmt.Errorf("%s create: no %s was inserted, no rows was returned", rc.GetType(), rc.GetType()), http.StatusInternalServerError
} else if rowsAffected > 1 {
return nil, fmt.Errorf("too many rows returned from %s insert", rc.GetType()), http.StatusInternalServerError
}
return nil, nil, http.StatusOK
}
func (rc *RequiredCapability) checkServerCap() (error, error, int) {
tx := rc.APIInfo().Tx
// Get server capability name
name := ""
if err := tx.QueryRow(`
SELECT name
FROM server_capability
WHERE name = $1`, rc.RequiredCapability).Scan(&name); err != nil && err != sql.ErrNoRows {
return nil, fmt.Errorf("querying server capability for name '%v': %v", rc.RequiredCapability, err), http.StatusInternalServerError
}
if len(name) == 0 {
return fmt.Errorf("server_capability not found"), nil, http.StatusNotFound
}
return nil, nil, http.StatusOK
}
// EnsureTopologyBasedRequiredCapabilities ensures that at least one server per cachegroup
// in this delivery service's topology has this delivery service's required capabilities.
func EnsureTopologyBasedRequiredCapabilities(tx *sql.Tx, dsID int, topology string, requiredCapabilities []string) (error, error, int) {
q := `
SELECT
s.id,
s.cdn_id,
c.name,
ARRAY_REMOVE(ARRAY_AGG(ssc.server_capability ORDER BY ssc.server_capability), NULL) AS capabilities
FROM server s
LEFT JOIN server_server_capability ssc ON ssc.server = s.id
JOIN cachegroup c ON c.id = s.cachegroup
JOIN topology_cachegroup tc ON tc.cachegroup = c.name
WHERE
s.cdn_id = (SELECT cdn_id FROM deliveryservice WHERE id = $1)
AND tc.topology = $2
AND c.type != (SELECT id FROM type WHERE name = '` + tc.CacheGroupOriginTypeName + `')
GROUP BY s.id, s.cdn_id, c.name
`
rows, err := tx.Query(q, dsID, topology)
if err != nil {
return nil, fmt.Errorf("querying server capabilities in EnsureTopologyBasedRequiredCapabilities: %v", err), http.StatusInternalServerError
}
cachegroupServers, serverCapabilities, _, err := dbhelpers.ScanCachegroupsServerCapabilities(rows)
if err != nil {
return nil, err, http.StatusInternalServerError
}
if len(serverCapabilities) == 0 {
return fmt.Errorf("topology %s contains no servers in this delivery service's CDN; "+
"therefore, this delivery service's required capabilities cannot be satisfied", topology), nil, http.StatusBadRequest
}
invalidCachegroups := GetInvalidCachegroupsForRequiredCapabilities(cachegroupServers, serverCapabilities, requiredCapabilities)
if len(invalidCachegroups) > 0 {
return fmt.Errorf("the following cachegroups in this delivery service's topology do not contain at least one server with the required capabilities: %s", strings.Join(invalidCachegroups, ", ")), nil, http.StatusBadRequest
}
return nil, nil, http.StatusOK
}
// GetInvalidCachegroupsForRequiredCapabilities returns the cachegroups that are invalid w.r.t. the given
// `requiredCapabilities` of a delivery service. `cachegroupServers` is a map of cachegroup names to
// server IDs that belong to the delivery service's CDN. `serverCapabilities` is a map of those server IDs to
// their set of capabilities.
func GetInvalidCachegroupsForRequiredCapabilities(
cachegroupServers map[string][]int,
serverCapabilities map[int]map[string]struct{},
requiredCapabilities []string,
) []string {
invalidCachegroups := []string{}
for cachegroup, servers := range cachegroupServers {
cgIsValid := false
for _, server := range servers {
serverHasCapabilities := true
for _, dsReqCap := range requiredCapabilities {
if _, ok := serverCapabilities[server][dsReqCap]; !ok {
serverHasCapabilities = false
break
}
}
if serverHasCapabilities {
cgIsValid = true
break
}
}
if !cgIsValid {
invalidCachegroups = append(invalidCachegroups, cachegroup)
}
}
return invalidCachegroups
}
func (rc *RequiredCapability) ensureDSServerCap() (error, error, int) {
tx := rc.APIInfo().Tx
// Get assigned DS server IDs
dsServerIDs := []int64{}
if err := tx.Tx.QueryRow(`
SELECT ARRAY(
SELECT ds.server
FROM deliveryservice_server ds
JOIN server s ON ds.server = s.id
JOIN type t ON s.type = t.id
WHERE ds.deliveryservice=$1
AND NOT t.name LIKE 'ORG%'
)`, rc.DeliveryServiceID).Scan(pq.Array(&dsServerIDs)); err != nil && err != sql.ErrNoRows {
return nil, fmt.Errorf("reading delivery service %v servers: %v", *rc.DeliveryServiceID, err), http.StatusInternalServerError
}
if len(dsServerIDs) == 0 { // no attached servers can return success right away
return nil, nil, http.StatusOK
}
// Get servers IDs that have the new capability
capServerIDs := []int64{}
if err := tx.QueryRow(`
SELECT ARRAY(
SELECT server
FROM server_server_capability
WHERE server = ANY($1)
AND server_capability=$2
)`, pq.Array(dsServerIDs), rc.RequiredCapability).Scan(pq.Array(&capServerIDs)); err != nil && err != sql.ErrNoRows {
return nil, fmt.Errorf("reading servers that have server capability %v attached: %v", *rc.RequiredCapability, err), http.StatusInternalServerError
}
vIDs := getViolatingServerIDs(dsServerIDs, capServerIDs)
if len(vIDs) > 0 {
return fmt.Errorf("capability %v cannot be made required on the delivery service %v as it has the associated servers %v that do not have the capability assigned", *rc.RequiredCapability, *rc.DeliveryServiceID, strings.Join(vIDs, ",")), nil, http.StatusBadRequest
}
return nil, nil, http.StatusOK
}
func getViolatingServerIDs(dsServerIDs, capServerIDs []int64) []string {
capServerIDsMap := map[int64]struct{}{}
for _, id := range capServerIDs {
capServerIDsMap[id] = struct{}{}
}
vIDs := []string{}
for _, id := range dsServerIDs {
if _, found := capServerIDsMap[id]; !found {
vIDs = append(vIDs, strconv.FormatInt(id, 10))
}
}
return vIDs
}
func (rc *RequiredCapability) isTenantAuthorized() (bool, error) {
if rc.DeliveryServiceID == nil && rc.XMLID == nil {
return false, errors.New("delivery service has no ID or XMLID")
}
var existingID *int
var err error
switch {
case rc.DeliveryServiceID != nil:
existingID, _, err = getDSTenantIDByID(rc.APIInfo().Tx.Tx, *rc.DeliveryServiceID)
if err != nil {
return false, err
}
case rc.XMLID != nil:
existingID, _, err = getDSTenantIDByName(rc.APIInfo().Tx.Tx, tc.DeliveryServiceName(*rc.XMLID))
if err != nil {
return false, err
}
}
if existingID != nil {
authorized, err := tenant.IsResourceAuthorizedToUserTx(*existingID, rc.APIInfo().User, rc.APIInfo().Tx.Tx)
if !authorized {
return false, errors.New("not authorized on this tenant")
} else if err != nil {
return false, fmt.Errorf("checking authorization for existing DS ID: %s" + err.Error())
}
}
return true, err
}
func rcInsertQuery() string {
return `INSERT INTO deliveryservices_required_capability (
required_capability,
deliveryservice_id) VALUES (
:required_capability,
:deliveryservice_id) RETURNING deliveryservice_id, required_capability, last_updated`
}
// language=SQL
const HasRequiredCapabilitiesQuery = `
SELECT EXISTS(
SELECT drc.required_capability
FROM deliveryservices_required_capability drc
WHERE drc.deliveryservice_id = $1
)
`