blob: 2742ae57617765a4aa6911c6084e4767b7dbb314 [file] [log] [blame]
package riaksvc
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import (
"database/sql"
"encoding/json"
"errors"
"github.com/apache/trafficcontrol/lib/go-log"
"github.com/apache/trafficcontrol/lib/go-tc"
"github.com/basho/riak-go-client"
)
const DeliveryServiceSSLKeysBucket = "ssl"
const DNSSECKeysBucket = "dnssec"
const DefaultDSSSLKeyVersion = "latest"
const URLSigKeysBucket = "url_sig_keys"
func MakeDSSSLKeyKey(dsName, version string) string {
if version == "" {
version = DefaultDSSSLKeyVersion
}
return dsName + "-" + version
}
func GetDeliveryServiceSSLKeysObj(xmlID string, version string, tx *sql.Tx, authOpts *riak.AuthOptions) (tc.DeliveryServiceSSLKeys, bool, error) {
key := tc.DeliveryServiceSSLKeys{}
found := false
err := WithClusterTx(tx, authOpts, func(cluster StorageCluster) error {
// get the deliveryservice ssl keys by xmlID and version
ro, err := FetchObjectValues(MakeDSSSLKeyKey(xmlID, version), DeliveryServiceSSLKeysBucket, cluster)
if err != nil {
return err
}
if len(ro) == 0 {
return nil // not found
}
if err := json.Unmarshal(ro[0].Value, &key); err != nil {
return errors.New("unmarshalling Riak result: " + err.Error())
}
found = true
return nil
})
if err != nil {
return key, false, err
}
return key, found, nil
}
func GetDeliveryServiceSSLKeysObjTx(xmlID string, version string, tx *sql.Tx, authOpts *riak.AuthOptions) (tc.DeliveryServiceSSLKeys, bool, error) {
key := tc.DeliveryServiceSSLKeys{}
if version == "" {
xmlID += "-latest"
} else {
xmlID += "-" + version
}
found := false
err := WithClusterTx(tx, authOpts, func(cluster StorageCluster) error {
// get the deliveryservice ssl keys by xmlID and version
ro, err := FetchObjectValues(xmlID, DeliveryServiceSSLKeysBucket, cluster)
if err != nil {
return err
}
if len(ro) == 0 {
return nil // not found
}
if err := json.Unmarshal(ro[0].Value, &key); err != nil {
log.Errorf("failed at unmarshaling sslkey response: %s\n", err)
return errors.New("unmarshalling Riak result: " + err.Error())
}
found = true
return nil
})
if err != nil {
return key, false, err
}
return key, found, nil
}
func PutDeliveryServiceSSLKeysObj(key tc.DeliveryServiceSSLKeys, tx *sql.Tx, authOpts *riak.AuthOptions) error {
keyJSON, err := json.Marshal(&key)
if err != nil {
return errors.New("marshalling key: " + err.Error())
}
err = WithClusterTx(tx, authOpts, func(cluster StorageCluster) error {
obj := &riak.Object{
ContentType: "text/json",
Charset: "utf-8",
ContentEncoding: "utf-8",
Key: MakeDSSSLKeyKey(key.DeliveryService, string(key.Version)),
Value: []byte(keyJSON),
}
if err = SaveObject(obj, DeliveryServiceSSLKeysBucket, cluster); err != nil {
return errors.New("saving Riak object: " + err.Error())
}
return nil
})
return err
}
func PutDeliveryServiceSSLKeysObjTx(key tc.DeliveryServiceSSLKeys, tx *sql.Tx, authOpts *riak.AuthOptions) error {
keyJSON, err := json.Marshal(&key)
if err != nil {
return errors.New("marshalling key: " + err.Error())
}
err = WithClusterTx(tx, authOpts, func(cluster StorageCluster) error {
obj := &riak.Object{
ContentType: "text/json",
Charset: "utf-8",
ContentEncoding: "utf-8",
Key: MakeDSSSLKeyKey(key.DeliveryService, string(key.Version)),
Value: []byte(keyJSON),
}
if err = SaveObject(obj, DeliveryServiceSSLKeysBucket, cluster); err != nil {
return errors.New("saving Riak object: " + err.Error())
}
return nil
})
return err
}
func Ping(tx *sql.Tx, authOpts *riak.AuthOptions) (tc.RiakPingResp, error) {
servers, err := GetRiakServers(tx)
if err != nil {
return tc.RiakPingResp{}, errors.New("getting riak servers: " + err.Error())
}
for _, server := range servers {
cluster, err := RiakServersToCluster([]ServerAddr{server}, authOpts)
if err != nil {
log.Errorf("RiakServersToCluster error for server %+v: %+v\n", server, err.Error())
continue // try another server
}
if err = cluster.Start(); err != nil {
log.Errorln("starting Riak cluster (for ping): " + err.Error())
continue
}
if err := PingCluster(cluster); err != nil {
if err := cluster.Stop(); err != nil {
log.Errorln("stopping Riak cluster (after ping error): " + err.Error())
}
log.Errorf("Riak PingCluster error for server %+v: %+v\n", server, err.Error())
continue
}
if err := cluster.Stop(); err != nil {
log.Errorln("stopping Riak cluster (after ping success): " + err.Error())
}
return tc.RiakPingResp{Status: "OK", Server: server.FQDN + ":" + server.Port}, nil
}
return tc.RiakPingResp{}, errors.New("failed to ping any Riak server")
}
func GetDNSSECKeys(cdnName string, tx *sql.Tx, authOpts *riak.AuthOptions) (tc.DNSSECKeys, bool, error) {
key := tc.DNSSECKeys{}
found := false
err := WithClusterTx(tx, authOpts, func(cluster StorageCluster) error {
ro, err := FetchObjectValues(cdnName, DNSSECKeysBucket, cluster)
if err != nil {
return err
}
if len(ro) == 0 {
return nil // not found
}
if err := json.Unmarshal(ro[0].Value, &key); err != nil {
return errors.New("unmarshalling Riak dnssec response: " + err.Error())
}
found = true
return nil
})
if err != nil {
return key, false, err
}
return key, found, nil
}
func PutDNSSECKeys(keys tc.DNSSECKeys, cdnName string, tx *sql.Tx, authOpts *riak.AuthOptions) error {
keyJSON, err := json.Marshal(&keys)
if err != nil {
return errors.New("marshalling keys: " + err.Error())
}
err = WithClusterTx(tx, authOpts, func(cluster StorageCluster) error {
obj := &riak.Object{
ContentType: "application/json",
Charset: "utf-8",
ContentEncoding: "utf-8",
Key: cdnName,
Value: []byte(keyJSON),
}
if err = SaveObject(obj, DNSSECKeysBucket, cluster); err != nil {
return errors.New("saving Riak object: " + err.Error())
}
return nil
})
return err
}
func GetBucketKey(tx *sql.Tx, authOpts *riak.AuthOptions, bucket string, key string) ([]byte, bool, error) {
val := []byte{}
found := false
err := WithClusterTx(tx, authOpts, func(cluster StorageCluster) error {
// get the deliveryservice ssl keys by xmlID and version
ro, err := FetchObjectValues(key, bucket, cluster)
if err != nil {
return err
}
if len(ro) == 0 {
return nil // not found
}
val = ro[0].Value
found = true
return nil
})
if err != nil {
return val, false, err
}
return val, found, nil
}
func DeleteDSSSLKeys(tx *sql.Tx, authOpts *riak.AuthOptions, ds tc.DeliveryServiceName, version string) error {
if version == "" {
version = "latest"
}
key := string(ds) + "-" + version
cluster, err := GetRiakClusterTx(tx, authOpts)
if err != nil {
return errors.New("getting riak cluster: " + err.Error())
}
if err = cluster.Start(); err != nil {
return errors.New("starting riak cluster: " + err.Error())
}
defer func() {
if err := cluster.Stop(); err != nil {
log.Errorln("stopping Riak cluster: " + err.Error())
}
}()
if err := DeleteObject(key, DeliveryServiceSSLKeysBucket, cluster); err != nil {
return errors.New("deleting SSL keys: " + err.Error())
}
return nil
}
// GetURLSigConfigFileName returns the filename of the Apache Traffic Server URLSig config file
// TODO move to ats config directory/file
func GetURLSigConfigFileName(ds tc.DeliveryServiceName) string {
return "url_sig_" + string(ds) + ".config"
}
func GetURLSigKeys(tx *sql.Tx, authOpts *riak.AuthOptions, ds tc.DeliveryServiceName) (tc.URLSigKeys, bool, error) {
val := tc.URLSigKeys{}
found := false
key := GetURLSigConfigFileName(ds)
err := WithClusterTx(tx, authOpts, func(cluster StorageCluster) error {
ro, err := FetchObjectValues(key, URLSigKeysBucket, cluster)
if err != nil {
return err
}
if len(ro) == 0 {
return nil // not found
}
if err := json.Unmarshal(ro[0].Value, &val); err != nil {
return errors.New("unmarshalling Riak response: " + err.Error())
}
found = true
return nil
})
if err != nil {
return val, false, err
}
return val, found, nil
}
func PutURLSigKeys(tx *sql.Tx, authOpts *riak.AuthOptions, ds tc.DeliveryServiceName, keys tc.URLSigKeys) error {
keyJSON, err := json.Marshal(&keys)
if err != nil {
return errors.New("marshalling keys: " + err.Error())
}
err = WithClusterTx(tx, authOpts, func(cluster StorageCluster) error {
obj := &riak.Object{
ContentType: "application/json",
Charset: "utf-8",
ContentEncoding: "utf-8",
Key: GetURLSigConfigFileName(ds),
Value: []byte(keyJSON),
}
if err = SaveObject(obj, URLSigKeysBucket, cluster); err != nil {
return errors.New("saving Riak object: " + err.Error())
}
return nil
})
return err
}
const SSLKeysIndex = "sslkeys"
const CDNSSLKeysLimit = 1000 // TODO: emulates Perl; reevaluate?
func GetCDNSSLKeysObj(tx *sql.Tx, authOpts *riak.AuthOptions, cdnName string) ([]tc.CDNSSLKey, error) {
keys := []tc.CDNSSLKey{}
err := WithClusterTx(tx, authOpts, func(cluster StorageCluster) error {
// get the deliveryservice ssl keys by xmlID and version
query := `cdn:` + cdnName
filterQuery := `_yz_rk:*latest`
searchDocs, err := Search(cluster, SSLKeysIndex, query, filterQuery, CDNSSLKeysLimit)
if err != nil {
return errors.New("riak search error: " + err.Error())
}
if len(searchDocs) == 0 {
return nil // no error, and leave keys empty
}
keys = SearchDocsToCDNSSLKeys(searchDocs)
return nil
})
if err != nil {
return nil, errors.New("with cluster error: " + err.Error())
}
return keys, nil
}
// SearchDocsToCDNSSLKeys converts the SearchDoc array returned by Riak into a CDNSSLKey slice. If a SearchDoc doesn't contain expected fields, it creates the key with those fields defaulted to empty strings.
func SearchDocsToCDNSSLKeys(docs []*riak.SearchDoc) []tc.CDNSSLKey {
keys := []tc.CDNSSLKey{}
for _, doc := range docs {
key := tc.CDNSSLKey{}
if dss := doc.Fields["deliveryservice"]; len(dss) > 0 {
key.DeliveryService = dss[0]
}
if hosts := doc.Fields["hostname"]; len(hosts) > 0 {
key.HostName = hosts[0]
}
if crts := doc.Fields["certificate.crt"]; len(crts) > 0 {
key.Certificate.Crt = crts[0]
}
if keys := doc.Fields["certificate.key"]; len(keys) > 0 {
key.Certificate.Key = keys[0]
}
keys = append(keys, key)
}
return keys
}