blob: 2e9e38013e16cc4cab0de95006682f35a5eee529 [file]
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License
#ifndef __PROCESS_LIMITER_HPP__
#define __PROCESS_LIMITER_HPP__
#include <deque>
#include <process/defer.hpp>
#include <process/delay.hpp>
#include <process/dispatch.hpp>
#include <process/id.hpp>
#include <process/future.hpp>
#include <process/process.hpp>
#include <process/timeout.hpp>
#include <stout/duration.hpp>
#include <stout/foreach.hpp>
#include <stout/nothing.hpp>
namespace process {
// Forward declaration.
class RateLimiterProcess;
// Provides an abstraction that rate limits the number of "permits"
// that can be acquired over some duration.
// NOTE: Currently, each libprocess Process should use a separate
// RateLimiter instance. This is because if multiple processes share
// a RateLimiter instance, by the time a process acts on the Future
// returned by 'acquire()' another process might have acquired the
// next permit and do its rate limited operation.
class RateLimiter
{
public:
RateLimiter(int permits, const Duration& duration);
explicit RateLimiter(double permitsPerSecond);
virtual ~RateLimiter();
// Returns a future that becomes ready when the permit is acquired.
// Discarding this future cancels this acquisition.
virtual Future<Nothing> acquire() const;
private:
// Not copyable, not assignable.
RateLimiter(const RateLimiter&);
RateLimiter& operator=(const RateLimiter&);
RateLimiterProcess* process;
};
class RateLimiterProcess : public Process<RateLimiterProcess>
{
public:
RateLimiterProcess(int permits, const Duration& duration)
: ProcessBase(ID::generate("__limiter__"))
{
CHECK_GT(permits, 0);
CHECK_GT(duration.secs(), 0);
permitsPerSecond = permits / duration.secs();
}
explicit RateLimiterProcess(double _permitsPerSecond)
: ProcessBase(ID::generate("__limiter__")),
permitsPerSecond(_permitsPerSecond)
{
CHECK_GT(permitsPerSecond, 0);
}
void finalize() override
{
foreach (Promise<Nothing>* promise, promises) {
promise->discard();
delete promise;
}
promises.clear();
}
Future<Nothing> acquire()
{
if (!promises.empty()) {
// Need to wait for others to get permits first.
Promise<Nothing>* promise = new Promise<Nothing>();
promises.push_back(promise);
return promise->future()
.onDiscard(defer(self(), &Self::discard, promise->future()));
}
if (timeout.remaining() > Seconds(0)) {
// Need to wait a bit longer, but first one in the queue.
Promise<Nothing>* promise = new Promise<Nothing>();
promises.push_back(promise);
delay(timeout.remaining(), self(), &Self::_acquire);
return promise->future()
.onDiscard(defer(self(), &Self::discard, promise->future()));
}
// No need to wait!
timeout = Seconds(1) / permitsPerSecond;
return Nothing();
}
private:
// Not copyable, not assignable.
RateLimiterProcess(const RateLimiterProcess&);
RateLimiterProcess& operator=(const RateLimiterProcess&);
void _acquire()
{
CHECK(!promises.empty());
// Keep removing the top of the queue until we find a promise
// whose future is not discarded.
while (!promises.empty()) {
Promise<Nothing>* promise = promises.front();
promises.pop_front();
if (!promise->future().isDiscarded()) {
promise->set(Nothing());
delete promise;
timeout = Seconds(1) / permitsPerSecond;
break;
} else {
delete promise;
}
}
// Repeat if necessary.
if (!promises.empty()) {
delay(timeout.remaining(), self(), &Self::_acquire);
}
}
void discard(const Future<Nothing>& future)
{
foreach (Promise<Nothing>* promise, promises) {
if (promise->future() == future) {
promise->discard();
}
}
}
double permitsPerSecond;
Timeout timeout;
std::deque<Promise<Nothing>*> promises;
};
inline RateLimiter::RateLimiter(int permits, const Duration& duration)
{
process = new RateLimiterProcess(permits, duration);
spawn(process);
}
inline RateLimiter::RateLimiter(double permitsPerSecond)
{
process = new RateLimiterProcess(permitsPerSecond);
spawn(process);
}
inline RateLimiter::~RateLimiter()
{
terminate(process);
wait(process);
delete process;
}
inline Future<Nothing> RateLimiter::acquire() const
{
return dispatch(process, &RateLimiterProcess::acquire);
}
} // namespace process {
#endif // __PROCESS_LIMITER_HPP__