blob: 543e77c8b07df596cc674e591c7c5f166f22a8e5 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// Date 2014/09/24 16:01:08
#ifndef BVAR_REDUCER_H
#define BVAR_REDUCER_H
#include <limits> // std::numeric_limits
#include "butil/logging.h" // LOG()
#include "butil/type_traits.h" // butil::add_cr_non_integral
#include "butil/class_name.h" // class_name_str
#include "bvar/variable.h" // Variable
#include "bvar/detail/combiner.h" // detail::AgentCombiner
#include "bvar/detail/sampler.h" // ReducerSampler
#include "bvar/detail/series.h"
#include "bvar/window.h"
#if WITH_BABYLON_COUNTER
#include "babylon/concurrent/counter.h"
#endif // WITH_BABYLON_COUNTER
namespace bvar {
namespace detail {
template<typename O, typename T, typename Op>
class SeriesSamplerImpl : public Sampler {
public:
SeriesSamplerImpl(O* owner, const Op& op)
: _owner(owner), _series(op) {}
void take_sample() override { _series.append(_owner->get_value()); }
void describe(std::ostream& os) { _series.describe(os, NULL); }
private:
O* _owner;
Series<T, Op> _series;
};
#if WITH_BABYLON_COUNTER
template<typename T, typename Counter, typename Op, typename InvOp>
class BabylonVariable: public Variable {
public:
typedef ReducerSampler<BabylonVariable, T, Op, InvOp> sampler_type;
typedef SeriesSamplerImpl<BabylonVariable, T, Op> series_sampler_type;
BabylonVariable() = default;
template<typename U = T, typename std::enable_if<
!std::is_constructible<Counter, U>::value, bool>::type = false>
BabylonVariable(U) {}
// For Maxer.
template<typename U = T, typename std::enable_if<
std::is_constructible<Counter, U>::value, bool>::type = false>
BabylonVariable(U default_value) : _counter(default_value) {}
DISALLOW_COPY_AND_MOVE(BabylonVariable);
~BabylonVariable() override {
hide();
if (NULL != _sampler) {
_sampler->destroy();
}
if (NULL != _series_sampler) {
_series_sampler->destroy();
}
}
BabylonVariable& operator<<(T value) {
_counter << value;
return *this;
}
sampler_type* get_sampler() {
if (NULL == _sampler) {
_sampler = new sampler_type(this);
_sampler->schedule();
}
return _sampler;
}
T get_value() const {
return _counter.value();
}
T reset() {
if (BAIDU_UNLIKELY((!butil::is_same<VoidOp, InvOp>::value))) {
CHECK(false) << "You should not call Reducer<" << butil::class_name_str<T>()
<< ", " << butil::class_name_str<Op>() << ">::get_value() when a"
<< " Window<> is used because the operator does not have inverse.";
return get_value();
}
T result = _counter.value();
_counter.reset();
return result;
}
bool valid() const { return true; }
const Op& op() const { return _op; }
const InvOp& inv_op() const { return _inv_op;}
void describe(std::ostream& os, bool quote_string) const override {
if (butil::is_same<T, std::string>::value && quote_string) {
os << '"' << get_value() << '"';
} else {
os << get_value();
}
}
int describe_series(std::ostream& os, const SeriesOptions& options) const override {
if (NULL == _series_sampler) {
return 1;
}
if (!options.test_only) {
_series_sampler->describe(os);
}
return 0;
}
protected:
int expose_impl(const butil::StringPiece& prefix,
const butil::StringPiece& name,
DisplayFilter display_filter) override {
const int rc = Variable::expose_impl(prefix, name, display_filter);
if (rc == 0 && NULL == _series_sampler &&
!butil::is_same<InvOp, VoidOp>::value &&
!butil::is_same<T, std::string>::value &&
FLAGS_save_series) {
_series_sampler = new series_sampler_type(this, _op);
_series_sampler->schedule();
}
return rc;
}
private:
Counter _counter;
sampler_type* _sampler{NULL};
series_sampler_type* _series_sampler{NULL};
Op _op;
InvOp _inv_op;
};
#endif // WITH_BABYLON_COUNTER
} // namespace detail
// Reduce multiple values into one with `Op': e1 Op e2 Op e3 ...
// `Op' shall satisfy:
// - associative: a Op (b Op c) == (a Op b) Op c
// - commutative: a Op b == b Op a;
// - no side effects: a Op b never changes if a and b are fixed.
// otherwise the result is undefined.
//
// For performance issues, we don't let Op return value, instead it shall
// set the result to the first parameter in-place. Namely to add two values,
// "+=" should be implemented rather than "+".
//
// Reducer works for non-primitive T which satisfies:
// - T() should be the identity of Op.
// - stream << v should compile and put description of v into the stream
// Example:
// class MyType {
// friend std::ostream& operator<<(std::ostream& os, const MyType&);
// public:
// MyType() : _x(0) {}
// explicit MyType(int x) : _x(x) {}
// void operator+=(const MyType& rhs) const {
// _x += rhs._x;
// }
// private:
// int _x;
// };
// std::ostream& operator<<(std::ostream& os, const MyType& value) {
// return os << "MyType{" << value._x << "}";
// }
// bvar::Adder<MyType> my_type_sum;
// my_type_sum << MyType(1) << MyType(2) << MyType(3);
// LOG(INFO) << my_type_sum; // "MyType{6}"
template <typename T, typename Op, typename InvOp = detail::VoidOp>
class Reducer : public Variable {
public:
typedef detail::AgentCombiner<T, T, Op> combiner_type;
typedef typename combiner_type::self_shared_type shared_combiner_type;
typedef typename combiner_type::Agent agent_type;
typedef detail::ReducerSampler<Reducer, T, Op, InvOp> sampler_type;
typedef detail::SeriesSamplerImpl<Reducer, T, Op> SeriesSampler;
// The `identify' must satisfy: identity Op a == a
explicit Reducer(typename butil::add_cr_non_integral<T>::type identity = T(),
const Op& op = Op(), const InvOp& inv_op = InvOp())
: _combiner(std::make_shared<combiner_type>(identity, identity, op))
, _sampler(NULL) , _series_sampler(NULL) , _inv_op(inv_op) {}
~Reducer() override {
// Calling hide() manually is a MUST required by Variable.
hide();
if (_sampler) {
_sampler->destroy();
_sampler = NULL;
}
if (_series_sampler) {
_series_sampler->destroy();
_series_sampler = NULL;
}
}
// Add a value.
// Returns self reference for chaining.
Reducer& operator<<(typename butil::add_cr_non_integral<T>::type value);
// Get reduced value.
// Notice that this function walks through threads that ever add values
// into this reducer. You should avoid calling it frequently.
T get_value() const {
CHECK(!(butil::is_same<InvOp, detail::VoidOp>::value) || _sampler == NULL)
<< "You should not call Reducer<" << butil::class_name_str<T>()
<< ", " << butil::class_name_str<Op>() << ">::get_value() when a"
<< " Window<> is used because the operator does not have inverse.";
return _combiner->combine_agents();
}
// Reset the reduced value to T().
// Returns the reduced value before reset.
T reset() { return _combiner->reset_all_agents(); }
void describe(std::ostream& os, bool quote_string) const override {
if (butil::is_same<T, std::string>::value && quote_string) {
os << '"' << get_value() << '"';
} else {
os << get_value();
}
}
#ifdef BAIDU_INTERNAL
void get_value(boost::any* value) const override { *value = get_value(); }
#endif
// True if this reducer is constructed successfully.
bool valid() const { return _combiner->valid(); }
// Get instance of Op.
const Op& op() const { return _combiner->op(); }
const InvOp& inv_op() const { return _inv_op; }
sampler_type* get_sampler() {
if (NULL == _sampler) {
_sampler = new sampler_type(this);
_sampler->schedule();
}
return _sampler;
}
int describe_series(std::ostream& os, const SeriesOptions& options) const override {
if (_series_sampler == NULL) {
return 1;
}
if (!options.test_only) {
_series_sampler->describe(os);
}
return 0;
}
protected:
int expose_impl(const butil::StringPiece& prefix,
const butil::StringPiece& name,
DisplayFilter display_filter) override {
const int rc = Variable::expose_impl(prefix, name, display_filter);
if (rc == 0 &&
_series_sampler == NULL &&
!butil::is_same<InvOp, detail::VoidOp>::value &&
!butil::is_same<T, std::string>::value &&
FLAGS_save_series) {
_series_sampler = new SeriesSampler(this, _combiner->op());
_series_sampler->schedule();
}
return rc;
}
private:
shared_combiner_type _combiner;
sampler_type* _sampler;
SeriesSampler* _series_sampler;
InvOp _inv_op;
};
template <typename T, typename Op, typename InvOp>
inline Reducer<T, Op, InvOp>& Reducer<T, Op, InvOp>::operator<<(
typename butil::add_cr_non_integral<T>::type value) {
// It's wait-free for most time
agent_type* agent = _combiner->get_or_create_tls_agent();
if (__builtin_expect(!agent, 0)) {
LOG(FATAL) << "Fail to create agent";
return *this;
}
agent->element.modify(_combiner->op(), value);
return *this;
}
// =================== Common reducers ===================
// bvar::Adder<int> sum;
// sum << 1 << 2 << 3 << 4;
// LOG(INFO) << sum.get_value(); // 10
// Commonly used functors
namespace detail {
template <typename Tp>
struct AddTo {
void operator()(Tp & lhs,
typename butil::add_cr_non_integral<Tp>::type rhs) const
{ lhs += rhs; }
};
template <typename Tp>
struct MinusFrom {
void operator()(Tp & lhs,
typename butil::add_cr_non_integral<Tp>::type rhs) const
{ lhs -= rhs; }
};
} // namespace detail
template <typename T, typename = void>
class Adder : public Reducer<T, detail::AddTo<T>, detail::MinusFrom<T> > {
public:
typedef Reducer<T, detail::AddTo<T>, detail::MinusFrom<T> > Base;
typedef T value_type;
typedef typename Base::sampler_type sampler_type;
Adder() : Base() {}
Adder(const butil::StringPiece& name) : Base() {
this->expose(name);
}
Adder(const butil::StringPiece& prefix,
const butil::StringPiece& name) : Base() {
this->expose_as(prefix, name);
}
~Adder() override { Variable::hide(); }
};
#if WITH_BABYLON_COUNTER
// Numerical types supported by babylon counter.
template <typename T>
class Adder<T, std::enable_if<std::is_constructible<babylon::GenericsConcurrentAdder<T>>::value>>
: public detail::BabylonVariable<T, babylon::GenericsConcurrentAdder<T>,
detail::AddTo<T>, detail::MinusFrom<T>> {
public:
typedef T value_type;
private:
typedef detail::BabylonVariable<T, babylon::GenericsConcurrentAdder<T>,
detail::AddTo<value_type>, detail::MinusFrom<value_type>> Base;
public:
typedef detail::AddTo<value_type> Op;
typedef detail::MinusFrom<value_type> InvOp;
typedef typename Base::sampler_type sampler_type;
COMMON_VARIABLE_CONSTRUCTOR(Adder);
};
#endif // WITH_BABYLON_COUNTER
// bvar::Maxer<int> max_value;
// max_value << 1 << 2 << 3 << 4;
// LOG(INFO) << max_value.get_value(); // 4
namespace detail {
template <typename Tp>
struct MaxTo {
void operator()(Tp & lhs,
typename butil::add_cr_non_integral<Tp>::type rhs) const {
// Use operator< as well.
if (lhs < rhs) {
lhs = rhs;
}
}
};
class LatencyRecorderBase;
} // namespace detail
template <typename T, typename = void>
class Maxer : public Reducer<T, detail::MaxTo<T> > {
public:
typedef Reducer<T, detail::MaxTo<T> > Base;
typedef T value_type;
typedef typename Base::sampler_type sampler_type;
Maxer() : Base(std::numeric_limits<T>::min()) {}
Maxer(const butil::StringPiece& name)
: Base(std::numeric_limits<T>::min()) {
this->expose(name);
}
Maxer(const butil::StringPiece& prefix, const butil::StringPiece& name)
: Base(std::numeric_limits<T>::min()) {
this->expose_as(prefix, name);
}
~Maxer() override { Variable::hide(); }
private:
friend class detail::LatencyRecorderBase;
// The following private funcition a now used in LatencyRecorder,
// it's dangerous so we don't make them public
explicit Maxer(T default_value) : Base(default_value) {
}
Maxer(T default_value, const butil::StringPiece& prefix,
const butil::StringPiece& name)
: Base(default_value) {
this->expose_as(prefix, name);
}
Maxer(T default_value, const butil::StringPiece& name) : Base(default_value) {
this->expose(name);
}
};
#if WITH_BABYLON_COUNTER
namespace detail {
template <typename T>
class ConcurrentMaxer : public babylon::GenericsConcurrentMaxer<T> {
typedef babylon::GenericsConcurrentMaxer<T> Base;
public:
ConcurrentMaxer() = default;
ConcurrentMaxer(T default_value) : _default_value(default_value) {}
T value() const {
T result;
if (!Base::value(result)) {
return _default_value;
}
return std::max(result, _default_value);
}
private:
T _default_value{0};
};
} // namespace detail
// Numerical types supported by babylon counter.
template <typename T>
class Maxer<T, std::enable_if<std::is_constructible<detail::ConcurrentMaxer<T>>::value>>
: public detail::BabylonVariable<T, detail::ConcurrentMaxer<T>,
detail::MaxTo<T>, detail::VoidOp> {
public:
typedef T value_type;
private:
typedef detail::BabylonVariable<T, detail::ConcurrentMaxer<T>,
detail::MaxTo<value_type>, detail::VoidOp> Base;
public:
typedef detail::MaxTo<value_type> Op;
typedef detail::VoidOp InvOp;
typedef typename Base::sampler_type sampler_type;
COMMON_VARIABLE_CONSTRUCTOR(Maxer);
private:
friend class detail::LatencyRecorderBase;
Maxer(T default_value) : Base(default_value) {}
Maxer(T default_value, const butil::StringPiece& prefix, const butil::StringPiece& name)
: Base(default_value) {
Variable::expose_as(prefix, name);
}
Maxer(T default_value, const butil::StringPiece& name)
: Base(default_value) {
Variable::expose(name);
}
};
#endif // WITH_BABYLON_COUNTER
// bvar::Miner<int> min_value;
// min_value << 1 << 2 << 3 << 4;
// LOG(INFO) << min_value.get_value(); // 1
namespace detail {
template <typename Tp>
struct MinTo {
void operator()(Tp & lhs,
typename butil::add_cr_non_integral<Tp>::type rhs) const {
if (rhs < lhs) {
lhs = rhs;
}
}
};
} // namespace detail
template <typename T, typename = void>
class Miner : public Reducer<T, detail::MinTo<T> > {
public:
typedef Reducer<T, detail::MinTo<T> > Base;
typedef T value_type;
typedef typename Base::sampler_type sampler_type;
Miner() : Base(std::numeric_limits<T>::max()) {}
Miner(const butil::StringPiece& name)
: Base(std::numeric_limits<T>::max()) {
this->expose(name);
}
Miner(const butil::StringPiece& prefix, const butil::StringPiece& name)
: Base(std::numeric_limits<T>::max()) {
this->expose_as(prefix, name);
}
~Miner() override { Variable::hide(); }
};
#if WITH_BABYLON_COUNTER
// Numerical types supported by babylon counter.
template <typename T>
class Miner<T, std::enable_if<std::is_constructible<babylon::GenericsConcurrentMiner<T>>::value>>
: public detail::BabylonVariable<T, babylon::GenericsConcurrentMiner<T>,
detail::MinTo<T>, detail::VoidOp> {
public:
typedef T value_type;
private:
typedef detail::BabylonVariable<value_type, babylon::GenericsConcurrentMiner<T>,
detail::MinTo<value_type>, detail::VoidOp> Base;
public:
typedef detail::MinTo<value_type> Op;
typedef detail::VoidOp InvOp;
typedef typename Base::sampler_type sampler_type;
COMMON_VARIABLE_CONSTRUCTOR(Miner);
};
#endif // WITH_BABYLON_COUNTER
} // namespace bvar
#endif //BVAR_REDUCER_H