blob: fbd4fa78d1f1d1ede290c17572373c141d1b3a7d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// Date 2014/09/24 16:01:08
#ifndef BVAR_REDUCER_H
#define BVAR_REDUCER_H
#include <limits> // std::numeric_limits
#include "butil/logging.h" // LOG()
#include "butil/type_traits.h" // butil::add_cr_non_integral
#include "butil/class_name.h" // class_name_str
#include "bvar/variable.h" // Variable
#include "bvar/detail/combiner.h" // detail::AgentCombiner
#include "bvar/detail/sampler.h" // ReducerSampler
#include "bvar/detail/series.h"
#include "bvar/window.h"
namespace bvar {
// Reduce multiple values into one with `Op': e1 Op e2 Op e3 ...
// `Op' shall satisfy:
// - associative: a Op (b Op c) == (a Op b) Op c
// - commutative: a Op b == b Op a;
// - no side effects: a Op b never changes if a and b are fixed.
// otherwise the result is undefined.
//
// For performance issues, we don't let Op return value, instead it shall
// set the result to the first parameter in-place. Namely to add two values,
// "+=" should be implemented rather than "+".
//
// Reducer works for non-primitive T which satisfies:
// - T() should be the identity of Op.
// - stream << v should compile and put description of v into the stream
// Example:
// class MyType {
// friend std::ostream& operator<<(std::ostream& os, const MyType&);
// public:
// MyType() : _x(0) {}
// explicit MyType(int x) : _x(x) {}
// void operator+=(const MyType& rhs) const {
// _x += rhs._x;
// }
// private:
// int _x;
// };
// std::ostream& operator<<(std::ostream& os, const MyType& value) {
// return os << "MyType{" << value._x << "}";
// }
// bvar::Adder<MyType> my_type_sum;
// my_type_sum << MyType(1) << MyType(2) << MyType(3);
// LOG(INFO) << my_type_sum; // "MyType{6}"
template <typename T, typename Op, typename InvOp = detail::VoidOp>
class Reducer : public Variable {
public:
typedef typename detail::AgentCombiner<T, T, Op> combiner_type;
typedef typename combiner_type::Agent agent_type;
typedef detail::ReducerSampler<Reducer, T, Op, InvOp> sampler_type;
class SeriesSampler : public detail::Sampler {
public:
SeriesSampler(Reducer* owner, const Op& op)
: _owner(owner), _series(op) {}
~SeriesSampler() {}
void take_sample() override { _series.append(_owner->get_value()); }
void describe(std::ostream& os) { _series.describe(os, NULL); }
private:
Reducer* _owner;
detail::Series<T, Op> _series;
};
public:
// The `identify' must satisfy: identity Op a == a
Reducer(typename butil::add_cr_non_integral<T>::type identity = T(),
const Op& op = Op(),
const InvOp& inv_op = InvOp())
: _combiner(identity, identity, op)
, _sampler(NULL)
, _series_sampler(NULL)
, _inv_op(inv_op) {
}
~Reducer() {
// Calling hide() manually is a MUST required by Variable.
hide();
if (_sampler) {
_sampler->destroy();
_sampler = NULL;
}
if (_series_sampler) {
_series_sampler->destroy();
_series_sampler = NULL;
}
}
// Add a value.
// Returns self reference for chaining.
Reducer& operator<<(typename butil::add_cr_non_integral<T>::type value);
// Get reduced value.
// Notice that this function walks through threads that ever add values
// into this reducer. You should avoid calling it frequently.
T get_value() const {
CHECK(!(butil::is_same<InvOp, detail::VoidOp>::value) || _sampler == NULL)
<< "You should not call Reducer<" << butil::class_name_str<T>()
<< ", " << butil::class_name_str<Op>() << ">::get_value() when a"
<< " Window<> is used because the operator does not have inverse.";
return _combiner.combine_agents();
}
// Reset the reduced value to T().
// Returns the reduced value before reset.
T reset() { return _combiner.reset_all_agents(); }
void describe(std::ostream& os, bool quote_string) const override {
if (butil::is_same<T, std::string>::value && quote_string) {
os << '"' << get_value() << '"';
} else {
os << get_value();
}
}
#ifdef BAIDU_INTERNAL
void get_value(boost::any* value) const override { *value = get_value(); }
#endif
// True if this reducer is constructed successfully.
bool valid() const { return _combiner.valid(); }
// Get instance of Op.
const Op& op() const { return _combiner.op(); }
const InvOp& inv_op() const { return _inv_op; }
sampler_type* get_sampler() {
if (NULL == _sampler) {
_sampler = new sampler_type(this);
_sampler->schedule();
}
return _sampler;
}
int describe_series(std::ostream& os, const SeriesOptions& options) const override {
if (_series_sampler == NULL) {
return 1;
}
if (!options.test_only) {
_series_sampler->describe(os);
}
return 0;
}
protected:
int expose_impl(const butil::StringPiece& prefix,
const butil::StringPiece& name,
DisplayFilter display_filter) override {
const int rc = Variable::expose_impl(prefix, name, display_filter);
if (rc == 0 &&
_series_sampler == NULL &&
!butil::is_same<InvOp, detail::VoidOp>::value &&
!butil::is_same<T, std::string>::value &&
FLAGS_save_series) {
_series_sampler = new SeriesSampler(this, _combiner.op());
_series_sampler->schedule();
}
return rc;
}
private:
combiner_type _combiner;
sampler_type* _sampler;
SeriesSampler* _series_sampler;
InvOp _inv_op;
};
template <typename T, typename Op, typename InvOp>
inline Reducer<T, Op, InvOp>& Reducer<T, Op, InvOp>::operator<<(
typename butil::add_cr_non_integral<T>::type value) {
// It's wait-free for most time
agent_type* agent = _combiner.get_or_create_tls_agent();
if (__builtin_expect(!agent, 0)) {
LOG(FATAL) << "Fail to create agent";
return *this;
}
agent->element.modify(_combiner.op(), value);
return *this;
}
// =================== Common reducers ===================
// bvar::Adder<int> sum;
// sum << 1 << 2 << 3 << 4;
// LOG(INFO) << sum.get_value(); // 10
// Commonly used functors
namespace detail {
template <typename Tp>
struct AddTo {
void operator()(Tp & lhs,
typename butil::add_cr_non_integral<Tp>::type rhs) const
{ lhs += rhs; }
};
template <typename Tp>
struct MinusFrom {
void operator()(Tp & lhs,
typename butil::add_cr_non_integral<Tp>::type rhs) const
{ lhs -= rhs; }
};
}
template <typename T>
class Adder : public Reducer<T, detail::AddTo<T>, detail::MinusFrom<T> > {
public:
typedef Reducer<T, detail::AddTo<T>, detail::MinusFrom<T> > Base;
typedef T value_type;
typedef typename Base::sampler_type sampler_type;
public:
Adder() : Base() {}
explicit Adder(const butil::StringPiece& name) : Base() {
this->expose(name);
}
Adder(const butil::StringPiece& prefix,
const butil::StringPiece& name) : Base() {
this->expose_as(prefix, name);
}
~Adder() { Variable::hide(); }
};
// bvar::Maxer<int> max_value;
// max_value << 1 << 2 << 3 << 4;
// LOG(INFO) << max_value.get_value(); // 4
namespace detail {
template <typename Tp>
struct MaxTo {
void operator()(Tp & lhs,
typename butil::add_cr_non_integral<Tp>::type rhs) const {
// Use operator< as well.
if (lhs < rhs) {
lhs = rhs;
}
}
};
class LatencyRecorderBase;
}
template <typename T>
class Maxer : public Reducer<T, detail::MaxTo<T> > {
public:
typedef Reducer<T, detail::MaxTo<T> > Base;
typedef T value_type;
typedef typename Base::sampler_type sampler_type;
public:
Maxer() : Base(std::numeric_limits<T>::min()) {}
explicit Maxer(const butil::StringPiece& name)
: Base(std::numeric_limits<T>::min()) {
this->expose(name);
}
Maxer(const butil::StringPiece& prefix, const butil::StringPiece& name)
: Base(std::numeric_limits<T>::min()) {
this->expose_as(prefix, name);
}
~Maxer() { Variable::hide(); }
private:
friend class detail::LatencyRecorderBase;
// The following private funcition a now used in LatencyRecorder,
// it's dangerous so we don't make them public
explicit Maxer(T default_value) : Base(default_value) {
}
Maxer(T default_value, const butil::StringPiece& prefix,
const butil::StringPiece& name)
: Base(default_value) {
this->expose_as(prefix, name);
}
Maxer(T default_value, const butil::StringPiece& name) : Base(default_value) {
this->expose(name);
}
};
// bvar::Miner<int> min_value;
// min_value << 1 << 2 << 3 << 4;
// LOG(INFO) << min_value.get_value(); // 1
namespace detail {
template <typename Tp>
struct MinTo {
void operator()(Tp & lhs,
typename butil::add_cr_non_integral<Tp>::type rhs) const {
if (rhs < lhs) {
lhs = rhs;
}
}
};
} // namespace detail
template <typename T>
class Miner : public Reducer<T, detail::MinTo<T> > {
public:
typedef Reducer<T, detail::MinTo<T> > Base;
typedef T value_type;
typedef typename Base::sampler_type sampler_type;
public:
Miner() : Base(std::numeric_limits<T>::max()) {}
explicit Miner(const butil::StringPiece& name)
: Base(std::numeric_limits<T>::max()) {
this->expose(name);
}
Miner(const butil::StringPiece& prefix, const butil::StringPiece& name)
: Base(std::numeric_limits<T>::max()) {
this->expose_as(prefix, name);
}
~Miner() { Variable::hide(); }
};
} // namespace bvar
#endif //BVAR_REDUCER_H