blob: 89713a4c9b4337aa3cb7b514c7e51ccddab12f41 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <atomic>
#include <cstdint>
#include <functional>
#include <memory>
#include "utils/ports.h"
// Refer to https://github.com/apache/kudu/blob/master/src/kudu/util/striped64.h
namespace dsn {
// Padded POD container for std::atomic<int64_t>. This prevents false sharing of cache lines.
// Notice that in older versions of GCC `std::is_pod<std::atomic<int64_t>>::value` will return
// false, thus cacheline_aligned_int64 is not considered to be a POD. However it doesn't matter.
class cacheline_aligned_int64
{
public:
static constexpr int kAtomicInt64Size = sizeof(std::atomic<int64_t>);
cacheline_aligned_int64() = default;
inline bool compare_and_set(int64_t cmp, int64_t value)
{
return _value.compare_exchange_weak(cmp, value);
}
// Padding advice from Herb Sutter:
// http://www.drdobbs.com/parallel/eliminate-false-sharing/217500206?pgno=4
std::atomic<int64_t> _value;
char pad[CACHELINE_SIZE > kAtomicInt64Size ? CACHELINE_SIZE - kAtomicInt64Size : 1];
DISALLOW_COPY_AND_ASSIGN(cacheline_aligned_int64);
} CACHELINE_ALIGNED;
using cacheline_aligned_int64_ptr =
std::unique_ptr<cacheline_aligned_int64, std::function<void(cacheline_aligned_int64 *)>>;
extern cacheline_aligned_int64_ptr new_cacheline_aligned_int64();
extern cacheline_aligned_int64_ptr new_cacheline_aligned_int64_array(uint32_t size);
// This set of classes is heavily derived from JSR166e, released into the public domain
// by Doug Lea and the other authors.
//
// See: http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/Striped64.java?view=co
// See: http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/LongAdder.java?view=co
//
// The striped64 and striped_long_adder implementations here are simplified versions of what's
// present in JSR166e. However, the core ideas remain the same.
//
// Updating a single AtomicInteger in a multi-threaded environment can be quite slow:
//
// 1. False sharing of cache lines with other counters.
// 2. Cache line bouncing from high update rates, especially with many cores.
//
// These two problems are addressed by striped64. When there is no contention, it uses CAS on a
// single base counter to store updates. However, when striped64 detects contention
// (via a failed CAS operation), it will allocate a small, fixed size hashtable of Cells.
// A cacheline_aligned_int64 is a simple POD that pads out an atomic<int64_t> to 64 bytes to prevent
// sharing a cache line.
//
// Reading the value of a striped64 requires traversing the hashtable to calculate the true sum.
//
// Each updating thread uses a thread-local hashcode to determine its cacheline_aligned_int64 in the
// hashtable. If a thread fails to CAS its hashed cacheline_aligned_int64, it will do a lightweight
// rehash operation to try and find an uncontended bucket. Because the hashcode is thread-local,
// this rehash affects all striped64's accessed by the thread. This is good, since contention on one
// striped64 is indicative of contention elsewhere too.
//
// The hashtable is statically sized to the nearest power of 2 greater than or equal to the
// number of CPUs. This is sufficient, since this guarantees the existence of a perfect hash
// function. Due to the random rehashing, the threads should eventually converge to this function.
// In practice, this scheme has shown to be sufficient.
//
// The biggest simplification of this implementation compared to JSR166e is that we do not
// dynamically grow the table, instead immediately allocating it to the full size.
// We also do not lazily allocate each cacheline_aligned_int64, instead allocating the entire array
// at once. This means we waste some additional memory in low contention scenarios, and initial
// allocation will also be slower. Some of the micro-optimizations were also elided for readability.
class striped64
{
public:
striped64() = default;
protected:
// NOTE: the destructor is not virtual so that we can ensure that striped64
// has no vtable, thus reducing its size. We make it protected to ensure that
// no one attempts to delete a striped64* and invokes the wrong destructor.
~striped64() = default;
enum rehash
{
kRehash,
kNoRehash
};
// CAS the base field.
inline bool cas_base(int64_t cmp, int64_t val) { return _base.compare_exchange_weak(cmp, val); }
// Handles cases of updates involving initialization, resizing, creating new Cells, and/or
// contention. See above for further explanation.
//
// 'Updater' should be a function which takes the current value and returns
// the new value.
template <class Updater>
void retry_update(rehash to_rehash, Updater updater);
// Sets base and all cells to the given value.
void internal_reset(int64_t initial_value);
// Base value, used mainly when there is no contention, but also as a fallback during
// table initialization races. Updated via CAS.
std::atomic<int64_t> _base{0};
// Memory manager of cells. Once the destructor is called, cells will be freed.
cacheline_aligned_int64_ptr _cells_holder;
// Table of cells. When non-null, size is the nearest power of 2 >= NCPU.
// If this is set to -1, the pointer is 'locked' and some thread is in the
// process of allocating the array.
std::atomic<cacheline_aligned_int64 *> _cells{nullptr};
static uint64_t get_tls_hashcode();
private:
DISALLOW_COPY_AND_ASSIGN(striped64);
// Static hash code per-thread. Shared across all instances to limit thread-local pollution.
// Also, if a thread hits a collision on one striped64, it's also likely to collide on
// other striped64s too.
static __thread uint64_t _tls_hashcode;
};
// A 64-bit number optimized for high-volume concurrent updates.
// See striped64 for a longer explanation of the inner workings.
class striped_long_adder : striped64
{
public:
striped_long_adder() = default;
~striped_long_adder() = default;
void increment_by(int64_t x);
// Returns the current value.
// Note this is not an atomic snapshot in the presence of concurrent updates.
int64_t value() const;
// Call reset() ONLY when necessary.
inline void reset() { set(0); }
// Return the value immediately before it's reset.
int64_t fetch_and_reset();
private:
// `set` is not exposed since it's not an efficient operation
void set(int64_t val) { internal_reset(val); }
DISALLOW_COPY_AND_ASSIGN(striped_long_adder);
};
class concurrent_long_adder
{
public:
concurrent_long_adder();
~concurrent_long_adder() = default;
void increment_by(int64_t x);
// Returns the current value.
// Note this is not an atomic snapshot in the presence of concurrent updates.
int64_t value() const;
// Call reset() ONLY when necessary.
inline void reset() { set(0); }
// Return the value immediately before it's reset.
int64_t fetch_and_reset();
private:
// `set` is not exposed since it's not an efficient operation
void set(int64_t val);
cacheline_aligned_int64_ptr _cells_holder;
cacheline_aligned_int64 *_cells;
DISALLOW_COPY_AND_ASSIGN(concurrent_long_adder);
};
// Use template to wrap a long_adder implementation rather than inherit from a base class for
// the reason that virtual function will increase the class size and slow down the execution.
template <typename Adder>
class long_adder_wrapper
{
public:
long_adder_wrapper() = default;
~long_adder_wrapper() = default;
inline void increment_by(int64_t x) { adder.increment_by(x); }
inline void increment() { increment_by(1); }
inline void decrement() { increment_by(-1); }
// Returns the current value.
// Note this is not an atomic snapshot in the presence of concurrent updates.
inline int64_t value() const { return adder.value(); }
// Resets the counter state to zero. Call it ONLY when necessary.
inline void reset() { adder.reset(); }
// Return the value immediately before it's reset.
inline int64_t fetch_and_reset() { return adder.fetch_and_reset(); }
private:
Adder adder;
DISALLOW_COPY_AND_ASSIGN(long_adder_wrapper);
};
} // namespace dsn