blob: 2e40d5e140b536b6647e8746a1955b7fcfe45b2a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <deque>
#include <tuple>
#include <atomic>
#include <chrono>
#include <string>
#include <climits>
#include <mutex>
#include <thread>
#include "tscore/ink_config.h"
#include "ts/ts.h"
#include <yaml-cpp/yaml.h>
#include "utilities.h"
constexpr auto BUCKET_REFILL_INTERVAL = std::chrono::milliseconds{25}; // Increase rate limit buckets every 25ms
constexpr auto QUEUE_DELAY_TIME = std::chrono::milliseconds{300}; // Examine the queue every 300ms
using QueueTime = std::chrono::time_point<std::chrono::system_clock>;
// No metric for requests we accept; accepted requests should be counted under their usual metrics
enum {
RATE_LIMITER_METRIC_QUEUED = 0,
RATE_LIMITER_METRIC_REJECTED,
RATE_LIMITER_METRIC_EXPIRED,
RATE_LIMITER_METRIC_RESUMED,
RATE_LIMITER_METRIC_MAX
};
int bucket_refill_cont(TSCont cont, TSEvent event, void *edata);
class BucketManager
{
using self_type = BucketManager;
public:
class RateBucket
{
using self_type = RateBucket;
public:
RateBucket(uint32_t max) : _count(0), _max(max) {}
~RateBucket() = default;
RateBucket(self_type &&) = delete;
self_type &operator=(const self_type &) = delete;
self_type &operator=(self_type &&) = delete;
uint32_t
count() const
{
return _count.load(std::memory_order_acquire);
}
bool
consume()
{
uint32_t val = _count.load(std::memory_order_acquire);
while (val > 0) {
if (_count.compare_exchange_weak(val, val - 1, std::memory_order_release, std::memory_order_acquire)) {
break;
}
}
TSReleaseAssert(val <= _max);
return val > 0;
}
// This should only be called from the manager, as such no locking is needed
private:
friend class BucketManager;
void
refill()
{
static const uint32_t amount = _max / (1000 / BUCKET_REFILL_INTERVAL.count());
uint32_t old = _count.load(std::memory_order_acquire);
uint32_t nval;
do {
nval = old + amount;
} while (!_count.compare_exchange_weak(old, std::min(nval, _max), std::memory_order_release, std::memory_order_acquire));
}
std::atomic<uint32_t> _count;
uint32_t _max;
}; // End class RateBucket
BucketManager() = default;
~BucketManager()
{
if (_running) {
_running = false;
_thread.join(); // Wait for the thread to finish
}
}
BucketManager(self_type &) = delete;
BucketManager(self_type &&) = delete;
self_type &operator=(const self_type &) = delete;
self_type &operator=(self_type &&) = delete;
static BucketManager &
getInstance()
{
static self_type instance;
return instance;
}
void refill_thread();
std::shared_ptr<RateBucket>
add(uint32_t max)
{
auto bucket = std::make_shared<RateBucket>(max);
std::lock_guard<std::mutex> lock(_mutex);
if (!_running) {
_running = true;
_thread = std::thread(&BucketManager::refill_thread, this);
}
_buckets.push_back(bucket);
return bucket;
}
void
remove(std::shared_ptr<RateBucket> bucket)
{
std::lock_guard<std::mutex> lock(_mutex);
auto it = std::find(_buckets.begin(), _buckets.end(), bucket);
if (it != _buckets.end()) {
_buckets.erase(it);
}
}
private:
std::vector<std::shared_ptr<RateBucket>> _buckets;
std::mutex _mutex; // Protect the bucket list
bool _running = false; // Is the Bucket manager thread running already ?
std::thread _thread; // The thread refilling the buckets
}; // End class BucketManager
enum { RATE_LIMITER_TYPE_SNI = 0, RATE_LIMITER_TYPE_REMAP, RATE_LIMITER_TYPE_MAX };
enum class ReserveStatus { UNLIMITED = 0, RESERVED, FULL, HIGH_RATE };
static const char *RATE_LIMITER_METRIC_PREFIX = "plugin.rate_limiter";
void metric_helper(std::array<int, RATE_LIMITER_METRIC_MAX> &metrics, uint type, const std::string &tag, const std::string &name,
const std::string &prefix = RATE_LIMITER_METRIC_PREFIX);
///////////////////////////////////////////////////////////////////////////////
// Base class for all limiters
//
template <class T> class RateLimiter
{
using QueueItem = std::tuple<T, TSCont, QueueTime>;
using self_type = RateLimiter;
public:
RateLimiter() = default;
RateLimiter(self_type &&) = delete;
self_type &operator=(const self_type &) = delete;
self_type &operator=(self_type &&) = delete;
virtual ~RateLimiter() { BucketManager::getInstance().remove(_bucket); }
void
initializeMetrics(uint type, std::string tag, std::string prefix = RATE_LIMITER_METRIC_PREFIX)
{
TSReleaseAssert(type < RATE_LIMITER_TYPE_MAX);
metric_helper(_metrics, type, tag, name(), prefix);
}
virtual bool
parseYaml(const YAML::Node &node)
{
if (node["limit"]) {
_limit = node["limit"].as<uint32_t>();
}
if (node["rate"]) {
_limit = node["rate"].as<uint32_t>();
}
// ToDo: One or both of these should be required
const YAML::Node &queue = node["queue"];
// If enabled, we default to UINT32_MAX, but the object default is still 0 (no queue)
if (queue) {
_max_queue = queue["size"] ? queue["size"].as<uint32_t>() : UINT32_MAX;
if (queue["max_age"]) {
_max_age = std::chrono::seconds(queue["max_age"].as<uint32_t>());
}
}
const YAML::Node &metrics = node["metrics"];
if (metrics) {
std::string prefix = metrics["prefix"] ? metrics["prefix"].as<std::string>() : RATE_LIMITER_METRIC_PREFIX;
std::string tag = metrics["tag"] ? metrics["tag"].as<std::string>() : name();
Dbg(dbg_ctl, "Metrics for selector rule: %s(%s, %s)", name().c_str(), prefix.c_str(), tag.c_str());
initializeMetrics(RATE_LIMITER_TYPE_SNI, prefix, tag);
}
return true;
}
// Add a rate bucket for this limiter
void
addBucket()
{
TSAssert(_rate > 0);
_bucket = BucketManager::getInstance().add(_rate);
}
// Reserve / release a slot from the active resource limits.
ReserveStatus
reserve()
{
if (_rate > 0) {
if (!_bucket->consume()) {
Dbg(dbg_ctl, "Rate limit exceeded");
return ReserveStatus::HIGH_RATE;
} else {
Dbg(dbg_ctl, "Rate limit OK, count() == %u", _bucket->count());
}
}
if (!has_limit()) { // If we have no limits and not at rate
return ReserveStatus::UNLIMITED;
}
std::lock_guard<std::mutex> lock(_active_lock);
TSReleaseAssert(_active <= _limit);
if (_active < _limit) {
++_active;
Dbg(dbg_ctl, "Reserving a slot, active entities == %u", _active.load());
return ReserveStatus::RESERVED;
}
return ReserveStatus::FULL;
}
void
free()
{
{
std::lock_guard<std::mutex> lock(_active_lock);
--_active;
}
Dbg(dbg_ctl, "Releasing a slot, active entities == %u", _active.load());
}
// Current size of the active_in connections
uint32_t
active() const
{
return _active.load();
}
// Current size of the queue
uint32_t
size() const
{
return _size.load();
}
// Is the queue full (at it's max size)?
bool
full() const
{
return (_size >= max_queue());
}
void
push(T elem, TSCont cont)
{
QueueTime now = std::chrono::system_clock::now();
std::lock_guard<std::mutex> lock(_queue_lock);
_queue.push_front(std::make_tuple(elem, cont, now));
++_size;
}
QueueItem
pop()
{
QueueItem item;
std::lock_guard<std::mutex> lock(_queue_lock);
if (!_queue.empty()) {
item = std::move(_queue.back());
_queue.pop_back();
--_size;
}
return item;
}
void
incrementMetric(uint metric)
{
if (_metrics[metric] != TS_ERROR) {
TSStatIntIncrement(_metrics[metric], 1);
}
}
bool
hasOldEntity(QueueTime now)
{
std::lock_guard<std::mutex> lock(_queue_lock);
if (!_queue.empty()) {
QueueItem item = _queue.back();
std::chrono::milliseconds age = std::chrono::duration_cast<std::chrono::milliseconds>(now - std::get<2>(item));
return (age >= max_age());
}
return false;
}
const std::string &
name() const
{
return _name;
}
uint32_t
limit() const
{
return _limit;
}
bool
has_limit() const
{
return _limit != UINT32_MAX && _limit != 0;
}
uint32_t
rate() const
{
return _rate;
}
uint32_t
max_queue() const
{
return _max_queue;
}
std::chrono::milliseconds
max_age() const
{
return _max_age;
}
void
setName(const std::string &name)
{
_name = name;
}
protected:
std::string _name = "_limiter_"; // The name/descr (e.g. SNI name) of this limiter
uint32_t _limit = UINT32_MAX; // No limit unless specified ...
uint32_t _rate = 0; // Rate limit (if any)
uint32_t _max_queue = 0; // No queue by default
std::chrono::milliseconds _max_age = std::chrono::milliseconds::zero(); // Max age (ms) in the queue
private:
std::atomic<uint32_t> _active = 0; // Current active number of txns. This has to always stay <= limit above
std::atomic<uint32_t> _size = 0; // Current size of the pending queue of txns. This should aim to be < _max_queue
std::mutex _queue_lock, _active_lock; // Resource locks
std::deque<QueueItem> _queue; // Queue for the pending TXN's. ToDo: Should also move (see below)
std::array<int, RATE_LIMITER_METRIC_MAX> _metrics{};
std::shared_ptr<BucketManager::RateBucket> _bucket; // The rate bucket (optional)
};