blob: 266e75d1cf5b1cc04fa66a7bd0b2a196914ce9b6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config
import (
"regexp"
"strconv"
"strings"
"sync"
"time"
)
import (
fc "github.com/dubbogo/dubbo-go-pixiu-filter/pkg/api/config"
fr "github.com/dubbogo/dubbo-go-pixiu-filter/pkg/api/config/ratelimit"
etcdv3 "github.com/dubbogo/gost/database/kv/etcd/v3"
perrors "github.com/pkg/errors"
"go.etcd.io/etcd/api/v3/mvccpb"
)
import (
"github.com/apache/dubbo-go-pixiu/pkg/common/yaml"
"github.com/apache/dubbo-go-pixiu/pkg/logger"
"github.com/apache/dubbo-go-pixiu/pkg/model"
)
var (
apiConfig *fc.APIConfig
once sync.Once
client *etcdv3.Client
listener APIConfigResourceListener
lock sync.RWMutex
)
var (
BASE_INFO_NAME = "name"
BASE_INFO_DESC = "description"
)
// APIConfigResourceListener defines api resource and method config listener interface
type APIConfigResourceListener interface {
// ResourceChange handle modify resource event
ResourceChange(new fc.Resource, old fc.Resource) bool // bool is return for interface implement is interesting
// ResourceAdd handle add resource event
ResourceAdd(res fc.Resource) bool
// ResourceDelete handle delete resource event
ResourceDelete(deleted fc.Resource) bool
// MethodChange handle modify method event
MethodChange(res fc.Resource, method fc.Method, old fc.Method) bool
// MethodAdd handle add method below one resource event
MethodAdd(res fc.Resource, method fc.Method) bool
// MethodDelete handle delete method event
MethodDelete(res fc.Resource, method fc.Method) bool
RateLimitChange(*fr.Config)
}
// LoadAPIConfigFromFile load the api config from file
func LoadAPIConfigFromFile(path string) (*fc.APIConfig, error) {
if len(path) == 0 {
return nil, perrors.Errorf("Config file not specified")
}
logger.Infof("Load API configuration file form %s", path)
apiConf := &fc.APIConfig{}
err := yaml.UnmarshalYMLConfig(path, apiConf)
if err != nil {
return nil, perrors.Errorf("unmarshalYmlConfig error %s", perrors.WithStack(err))
}
apiConfig = apiConf
return apiConf, nil
}
// LoadAPIConfig load the api config from config center
func LoadAPIConfig(metaConfig *model.APIMetaConfig) (*fc.APIConfig, error) {
tmpClient, err := etcdv3.NewConfigClientWithErr(
etcdv3.WithName(etcdv3.RegistryETCDV3Client),
etcdv3.WithTimeout(10*time.Second),
etcdv3.WithEndpoints(strings.Split(metaConfig.Address, ",")...),
)
if err != nil {
return nil, perrors.Errorf("Init etcd client fail error %s", err)
}
client = tmpClient
kList, vList, err := client.GetChildren(metaConfig.APIConfigPath)
if err != nil {
return nil, perrors.Errorf("Get remote config fail error %s", err)
}
if err = initAPIConfigFromKVList(kList, vList); err != nil {
return nil, err
}
// TODO: init other setting which need fetch from remote
go listenResourceAndMethodEvent(metaConfig.APIConfigPath)
// TODO: watch other setting which need fetch from remote
return apiConfig, nil
}
func initAPIConfigFromKVList(kList, vList []string) error {
var skList, svList, mkList, mvList []string
var baseInfo string
var rateLimit string
for i, k := range kList {
v := vList[i]
//handle base info
re := getCheckBaseInfoRegexp()
if m := re.Match([]byte(k)); m {
baseInfo = v
continue
}
// handle resource
re = getCheckResourceRegexp()
if m := re.Match([]byte(k)); m {
skList = append(skList, k)
svList = append(svList, v)
continue
}
// handle method
re = getExtractMethodRegexp()
if m := re.Match([]byte(k)); m {
mkList = append(mkList, k)
mvList = append(mvList, v)
continue
}
//handle rate limit config
re = getCheckRatelimitRegexp()
if m := re.Match([]byte(k)); m {
rateLimit = v
continue
}
}
lock.Lock()
defer lock.Unlock()
tmpApiConf := &fc.APIConfig{}
if err := initBaseInfoFromString(tmpApiConf, baseInfo); err != nil {
logger.Errorf("initBaseInfoFromString error %s", err)
return err
}
if err := initAPIConfigServiceFromKvList(tmpApiConf, skList, svList); err != nil {
logger.Errorf("initAPIConfigServiceFromKvList error %s", err)
return err
}
if err := initAPIConfigMethodFromKvList(tmpApiConf, mkList, mvList); err != nil {
logger.Errorf("initAPIConfigMethodFromKvList error %s", err)
return err
}
if err := initAPIConfigRatelimitFromString(tmpApiConf, rateLimit); err != nil {
logger.Errorf("initAPIConfigRatelimitFromString error %s", err)
return err
}
apiConfig = tmpApiConf
return nil
}
func initBaseInfoFromString(conf *fc.APIConfig, str string) error {
properties := make(map[string]string, 8)
if err := yaml.UnmarshalYML([]byte(str), properties); err != nil {
logger.Errorf("unmarshalYmlConfig error %s", err)
return err
}
if v, ok := properties[BASE_INFO_NAME]; ok {
conf.Name = v
}
if v, ok := properties[BASE_INFO_DESC]; ok {
conf.Description = v
}
return nil
}
func initAPIConfigRatelimitFromString(conf *fc.APIConfig, str string) error {
c := fr.Config{}
if err := yaml.UnmarshalYML([]byte(str), &c); err != nil {
logger.Errorf("unmarshalYmlConfig error %s", err)
return err
}
conf.RateLimit = c
return nil
}
func initAPIConfigMethodFromKvList(config *fc.APIConfig, kList, vList []string) error {
for i := range kList {
v := vList[i]
method := &fc.Method{}
err := yaml.UnmarshalYML([]byte(v), method)
if err != nil {
logger.Errorf("unmarshalYmlConfig error %s", err)
return err
}
found := false
for r, resource := range config.Resources {
if method.ResourcePath != resource.Path {
continue
}
for j, old := range resource.Methods {
if old.HTTPVerb == method.HTTPVerb {
// modify one method
resource.Methods[j] = *method
found = true
}
}
if !found {
resource.Methods = append(resource.Methods, *method)
config.Resources[r] = resource
found = true
}
}
// not found one resource, so need add empty resource first
if !found {
resource := &fc.Resource{}
resource.Methods = append(resource.Methods, *method)
resource.Path = method.ResourcePath
config.Resources = append(config.Resources, *resource)
}
}
return nil
}
func initAPIConfigServiceFromKvList(config *fc.APIConfig, kList, vList []string) error {
for i := range kList {
v := vList[i]
resource := &fc.Resource{}
err := yaml.UnmarshalYML([]byte(v), resource)
if err != nil {
logger.Errorf("unmarshalYmlConfig error %s", err)
return err
}
found := false
if config.Resources == nil {
config.Resources = make([]fc.Resource, 0)
}
for i, old := range config.Resources {
if old.Path != resource.Path {
continue
}
// replace old with new one except method list
resource.Methods = old.Methods
config.Resources[i] = *resource
found = true
}
if !found {
config.Resources = append(config.Resources, *resource)
}
continue
}
return nil
}
func listenResourceAndMethodEvent(key string) bool {
for {
wc, err := client.WatchWithPrefix(key)
if err != nil {
logger.Warnf("Watch api config {key:%s} = error{%s}", key, err)
return false
}
select {
// client stopped
case <-client.Done():
logger.Warnf("client stopped")
return false
// client ctx stop
// handle etcd events
case e, ok := <-wc:
if !ok {
logger.Warnf("watch-chan closed")
return false
}
if e.Err() != nil {
logger.Errorf("watch ERR {err: %s}", e.Err())
continue
}
for _, event := range e.Events {
switch event.Type {
case mvccpb.PUT:
logger.Infof("get event (key{%s}) = event{EventNodePut}", event.Kv.Key)
handlePutEvent(event.Kv.Key, event.Kv.Value)
case mvccpb.DELETE:
logger.Infof("get event (key{%s}) = event{EventNodeDeleted}", event.Kv.Key)
handleDeleteEvent(event.Kv.Key, event.Kv.Value)
default:
logger.Infof("get event (key{%s}) = event{%d}", event.Kv.Key, event.Type)
}
}
}
}
}
func handleDeleteEvent(key, val []byte) {
lock.Lock()
defer lock.Unlock()
keyStr := string(key)
keyStr = strings.TrimSuffix(keyStr, "/")
re := getCheckResourceRegexp()
if m := re.Match(key); m {
pathArray := strings.Split(keyStr, "/")
if len(pathArray) == 0 {
logger.Errorf("handleDeleteEvent key format error")
return
}
resourceIdStr := pathArray[len(pathArray)-1]
ID, err := strconv.Atoi(resourceIdStr)
if err != nil {
logger.Errorf("handleDeleteEvent ID is not int error %s", err)
return
}
deleteApiConfigResource(ID)
return
}
re = getExtractMethodRegexp()
if m := re.Match(key); m {
pathArray := strings.Split(keyStr, "/")
if len(pathArray) < 3 {
logger.Errorf("handleDeleteEvent key format error")
return
}
resourceIdStr := pathArray[len(pathArray)-3]
resourceId, err := strconv.Atoi(resourceIdStr)
if err != nil {
logger.Errorf("handleDeleteEvent ID is not int error %s", err)
return
}
methodIdStr := pathArray[len(pathArray)-1]
methodId, err := strconv.Atoi(methodIdStr)
if err != nil {
logger.Errorf("handleDeleteEvent ID is not int error %s", err)
return
}
deleteApiConfigMethod(resourceId, methodId)
}
re = getCheckRatelimitRegexp()
if m := re.Match(key); m {
empty := &fr.Config{}
listener.RateLimitChange(empty)
}
}
func handlePutEvent(key, val []byte) {
lock.Lock()
defer lock.Unlock()
re := getCheckResourceRegexp()
if m := re.Match(key); m {
res := &fc.Resource{}
err := yaml.UnmarshalYML(val, res)
if err != nil {
logger.Errorf("handlePutEvent UnmarshalYML error %s", err)
return
}
mergeApiConfigResource(*res)
return
}
re = getExtractMethodRegexp()
if m := re.Match(key); m {
res := &fc.Method{}
err := yaml.UnmarshalYML(val, res)
if err != nil {
logger.Errorf("handlePutEvent UnmarshalYML error %s", err)
return
}
mergeApiConfigMethod(res.ResourcePath, *res)
return
}
//handle base info
re = getCheckBaseInfoRegexp()
if m := re.Match(key); m {
mergeBaseInfo(val)
return
}
//handle ratelimit
re = getCheckRatelimitRegexp()
if m := re.Match(key); m {
mergeRatelimit(val)
return
}
}
func deleteApiConfigResource(resourceId int) {
for i := 0; i < len(apiConfig.Resources); i++ {
itr := apiConfig.Resources[i]
if itr.ID == resourceId {
apiConfig.Resources = append(apiConfig.Resources[:i], apiConfig.Resources[i+1:]...)
listener.ResourceDelete(itr)
return
}
}
}
func mergeApiConfigResource(val fc.Resource) {
for i, resource := range apiConfig.Resources {
if val.ID != resource.ID {
continue
}
// modify one resource
val.Methods = resource.Methods
apiConfig.Resources[i] = val
listener.ResourceChange(val, resource)
return
}
// add one resource
apiConfig.Resources = append(apiConfig.Resources, val)
listener.ResourceAdd(val)
}
func mergeRatelimit(val []byte) {
c := &fr.Config{}
if err := yaml.UnmarshalYML(val, c); err != nil {
logger.Errorf("unmarshalYmlConfig error %s", err)
return
}
apiConfig.RateLimit = *c
listener.RateLimitChange(c)
}
func mergeBaseInfo(val []byte) {
_ = initBaseInfoFromString(apiConfig, string(val))
}
func deleteApiConfigMethod(resourceId, methodId int) {
for _, resource := range apiConfig.Resources {
if resource.ID != resourceId {
continue
}
for i := 0; i < len(resource.Methods); i++ {
method := resource.Methods[i]
if method.ID == methodId {
resource.Methods = append(resource.Methods[:i], resource.Methods[i+1:]...)
listener.MethodDelete(resource, method)
return
}
}
}
}
func mergeApiConfigMethod(path string, val fc.Method) {
for i, resource := range apiConfig.Resources {
if path != resource.Path {
continue
}
for j, method := range resource.Methods {
if method.ID == val.ID {
// modify one method
resource.Methods[j] = val
listener.MethodChange(resource, val, method)
apiConfig.Resources[i] = resource
return
}
}
// add one method
resource.Methods = append(resource.Methods, val)
apiConfig.Resources[i] = resource
listener.MethodAdd(resource, val)
}
}
func getCheckBaseInfoRegexp() *regexp.Regexp {
return regexp.MustCompile(".+/base$")
}
func getCheckResourceRegexp() *regexp.Regexp {
return regexp.MustCompile(".+/resources/[^/]+/?$")
}
func getExtractMethodRegexp() *regexp.Regexp {
return regexp.MustCompile(".+/resources/([^/]+)/method/[^/]+/?$")
}
func getCheckRatelimitRegexp() *regexp.Regexp {
return regexp.MustCompile(".+/filter/ratelimit")
}
// RegisterConfigListener register APIConfigListener
func RegisterConfigListener(li APIConfigResourceListener) {
listener = li
}
// GetAPIConf returns the init api config
func GetAPIConf() fc.APIConfig {
return *apiConfig
}