blob: 5761579a38a22500d54193a9564170cc0215cf0f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package tps
import (
"fmt"
"strconv"
"sync"
)
import (
"github.com/modern-go/concurrent"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/filter"
"github.com/apache/dubbo-go/protocol"
)
const (
name = "method-service"
)
func init() {
extension.SetTpsLimiter(constant.DEFAULT_KEY, GetMethodServiceTpsLimiter)
extension.SetTpsLimiter(name, GetMethodServiceTpsLimiter)
}
// MethodServiceTpsLimiterImpl allows developer to config both method-level and service-level tps limiter.
/**
* for example:
* "UserProvider":
* registry: "hangzhouzk"
* protocol : "dubbo"
* interface : "com.ikurento.user.UserProvider"
* ... # other configuration
* tps.limiter: "method-service" or "default" # the name of MethodServiceTpsLimiterImpl. It's the default limiter too.
* tps.limit.interval: 5000 # interval, the time unit is ms
* tps.limit.rate: 300 # the max value in the interval. <0 means that the service will not be limited.
* methods:
* - name: "GetUser"
* tps.interval: 3000
* tps.limit.rate: 20, # in this case, this configuration in service-level will be ignored.
* - name: "UpdateUser"
* tps.limit.rate: -1, # If the rate<0, the method will be ignored
*
*
* More examples:
* case1:
* "UserProvider":
* registry: "hangzhouzk"
* protocol : "dubbo"
* interface : "com.ikurento.user.UserProvider"
* ... # other configuration
* tps.limiter: "method-service" or "default" # the name of MethodServiceTpsLimiterImpl. It's the default limiter too.
* tps.limit.interval: 5000 # interval, the time unit is ms
* tps.limit.rate: 300 # the max value in the interval. <0 means that the service will not be limited.
* methods:
* - name: "GetUser"
* - name: "UpdateUser"
* tps.limit.rate: -1,
* in this case, the method UpdateUser will be ignored,
* which means that only GetUser will be limited by service-level configuration.
*
* case2:
* "UserProvider":
* registry: "hangzhouzk"
* protocol : "dubbo"
* interface : "com.ikurento.user.UserProvider"
* ... # other configuration
* tps.limiter: "method-service" or "default" # the name of MethodServiceTpsLimiterImpl. It's the default limiter too.
* tps.limit.interval: 5000 # interval, the time unit is ms
* tps.limit.rate: 300 # the max value in the interval. <0 means that the service will not be limited.
* methods:
* - name: "GetUser"
* - name: "UpdateUser"
* tps.limit.rate: 30,
* In this case, the GetUser will be limited by service-level configuration(300 times in 5000ms),
* but UpdateUser will be limited by its configuration (30 times in 60000ms)
*
* case3:
* "UserProvider":
* registry: "hangzhouzk"
* protocol : "dubbo"
* interface : "com.ikurento.user.UserProvider"
* ... # other configuration
* tps.limiter: "method-service" or "default" # the name of MethodServiceTpsLimiterImpl. It's the default limiter too.
* methods:
* - name: "GetUser"
* - name: "UpdateUser"
* tps.limit.rate: 70,
* tps.limit.interval: 40000
* In this case, only UpdateUser will be limited by its configuration (70 times in 40000ms)
*/
type MethodServiceTpsLimiterImpl struct {
tpsState *concurrent.Map
}
// IsAllowable based on method-level and service-level.
// The method-level has high priority which means that if there is any rate limit configuration for the method,
// the service-level rate limit strategy will be ignored.
// The key point is how to keep thread-safe
// This implementation use concurrent map + loadOrStore to make implementation thread-safe
// You can image that even multiple threads create limiter, but only one could store the limiter into tpsState
func (limiter MethodServiceTpsLimiterImpl) IsAllowable(url common.URL, invocation protocol.Invocation) bool {
methodConfigPrefix := "methods." + invocation.MethodName() + "."
methodLimitRateConfig := url.GetParam(methodConfigPrefix+constant.TPS_LIMIT_RATE_KEY, "")
methodIntervalConfig := url.GetParam(methodConfigPrefix+constant.TPS_LIMIT_INTERVAL_KEY, "")
// service-level tps limit
limitTarget := url.ServiceKey()
// method-level tps limit
if len(methodIntervalConfig) > 0 || len(methodLimitRateConfig) > 0 {
// it means that if the method-level rate limit exist, we will use method-level rate limit strategy
limitTarget = limitTarget + "#" + invocation.MethodName()
}
// looking up the limiter from 'cache'
limitState, found := limiter.tpsState.Load(limitTarget)
if found {
// the limiter has been cached, we return its result
return limitState.(filter.TpsLimitStrategy).IsAllowable()
}
// we could not find the limiter, and try to create one.
limitRate := getLimitConfig(methodLimitRateConfig, url, invocation,
constant.TPS_LIMIT_RATE_KEY,
constant.DEFAULT_TPS_LIMIT_RATE)
if limitRate < 0 {
// the limitTarget is not necessary to be limited.
return true
}
limitInterval := getLimitConfig(methodIntervalConfig, url, invocation,
constant.TPS_LIMIT_INTERVAL_KEY,
constant.DEFAULT_TPS_LIMIT_INTERVAL)
if limitInterval <= 0 {
panic(fmt.Sprintf("The interval must be positive, please check your configuration! url: %s", url.String()))
}
// find the strategy config and then create one
limitStrategyConfig := url.GetParam(methodConfigPrefix+constant.TPS_LIMIT_STRATEGY_KEY,
url.GetParam(constant.TPS_LIMIT_STRATEGY_KEY, constant.DEFAULT_KEY))
limitStateCreator := extension.GetTpsLimitStrategyCreator(limitStrategyConfig)
// we using loadOrStore to ensure thread-safe
limitState, _ = limiter.tpsState.LoadOrStore(limitTarget, limitStateCreator.Create(int(limitRate), int(limitInterval)))
return limitState.(filter.TpsLimitStrategy).IsAllowable()
}
// getLimitConfig will try to fetch the configuration from url.
// If we can convert the methodLevelConfig to int64, return;
// Or, we will try to look up server-level configuration and then convert it to int64
func getLimitConfig(methodLevelConfig string,
url common.URL,
invocation protocol.Invocation,
configKey string,
defaultVal string) int64 {
if len(methodLevelConfig) > 0 {
result, err := strconv.ParseInt(methodLevelConfig, 0, 0)
if err != nil {
panic(fmt.Sprintf("The %s for invocation %s # %s must be positive, please check your configuration!",
configKey, url.ServiceKey(), invocation.MethodName()))
}
return result
}
// actually there is no method-level configuration, so we use the service-level configuration
result, err := strconv.ParseInt(url.GetParam(configKey, defaultVal), 0, 0)
if err != nil {
panic(fmt.Sprintf("Cannot parse the configuration %s, please check your configuration!", configKey))
}
return result
}
var methodServiceTpsLimiterInstance *MethodServiceTpsLimiterImpl
var methodServiceTpsLimiterOnce sync.Once
// GetMethodServiceTpsLimiter will return an MethodServiceTpsLimiterImpl instance.
func GetMethodServiceTpsLimiter() filter.TpsLimiter {
methodServiceTpsLimiterOnce.Do(func() {
methodServiceTpsLimiterInstance = &MethodServiceTpsLimiterImpl{
tpsState: concurrent.NewMap(),
}
})
return methodServiceTpsLimiterInstance
}