|  | // Licensed to the Apache Software Foundation (ASF) under one | 
|  | // or more contributor license agreements.  See the NOTICE file | 
|  | // distributed with this work for additional information | 
|  | // regarding copyright ownership.  The ASF licenses this file | 
|  | // to you under the Apache License, Version 2.0 (the | 
|  | // "License"); you may not use this file except in compliance | 
|  | // with the License.  You may obtain a copy of the License at | 
|  | // | 
|  | //   http://www.apache.org/licenses/LICENSE-2.0 | 
|  | // | 
|  | // Unless required by applicable law or agreed to in writing, | 
|  | // software distributed under the License is distributed on an | 
|  | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
|  | // KIND, either express or implied.  See the License for the | 
|  | // specific language governing permissions and limitations | 
|  | // under the License. | 
|  |  | 
|  | #pragma once | 
|  |  | 
|  | #include <atomic> | 
|  | #include <cstdint> | 
|  | #include <functional> | 
|  | #include <memory> | 
|  |  | 
|  | #include "utils/ports.h" | 
|  |  | 
|  | // Refer to https://github.com/apache/kudu/blob/master/src/kudu/util/striped64.h | 
|  |  | 
|  | namespace dsn { | 
|  |  | 
|  | // Padded POD container for std::atomic<int64_t>. This prevents false sharing of cache lines. | 
|  | // Notice that in older versions of GCC `std::is_pod<std::atomic<int64_t>>::value` will return | 
|  | // false, thus cacheline_aligned_int64 is not considered to be a POD. However it doesn't matter. | 
|  | class cacheline_aligned_int64 | 
|  | { | 
|  | public: | 
|  | static constexpr int kAtomicInt64Size = sizeof(std::atomic<int64_t>); | 
|  |  | 
|  | cacheline_aligned_int64() = default; | 
|  |  | 
|  | inline bool compare_and_set(int64_t cmp, int64_t value) | 
|  | { | 
|  | return _value.compare_exchange_weak(cmp, value); | 
|  | } | 
|  |  | 
|  | // Padding advice from Herb Sutter: | 
|  | // http://www.drdobbs.com/parallel/eliminate-false-sharing/217500206?pgno=4 | 
|  | std::atomic<int64_t> _value; | 
|  | char pad[CACHELINE_SIZE > kAtomicInt64Size ? CACHELINE_SIZE - kAtomicInt64Size : 1]; | 
|  |  | 
|  | DISALLOW_COPY_AND_ASSIGN(cacheline_aligned_int64); | 
|  | } CACHELINE_ALIGNED; | 
|  |  | 
|  | using cacheline_aligned_int64_ptr = | 
|  | std::unique_ptr<cacheline_aligned_int64, std::function<void(cacheline_aligned_int64 *)>>; | 
|  | extern cacheline_aligned_int64_ptr new_cacheline_aligned_int64(); | 
|  | extern cacheline_aligned_int64_ptr new_cacheline_aligned_int64_array(uint32_t size); | 
|  |  | 
|  | // This set of classes is heavily derived from JSR166e, released into the public domain | 
|  | // by Doug Lea and the other authors. | 
|  | // | 
|  | // See: http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/Striped64.java?view=co | 
|  | // See: http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/LongAdder.java?view=co | 
|  | // | 
|  | // The striped64 and striped_long_adder implementations here are simplified versions of what's | 
|  | // present in JSR166e. However, the core ideas remain the same. | 
|  | // | 
|  | // Updating a single AtomicInteger in a multi-threaded environment can be quite slow: | 
|  | // | 
|  | //   1. False sharing of cache lines with other counters. | 
|  | //   2. Cache line bouncing from high update rates, especially with many cores. | 
|  | // | 
|  | // These two problems are addressed by striped64. When there is no contention, it uses CAS on a | 
|  | // single base counter to store updates. However, when striped64 detects contention | 
|  | // (via a failed CAS operation), it will allocate a small, fixed size hashtable of Cells. | 
|  | // A cacheline_aligned_int64 is a simple POD that pads out an atomic<int64_t> to 64 bytes to prevent | 
|  | // sharing a cache line. | 
|  | // | 
|  | // Reading the value of a striped64 requires traversing the hashtable to calculate the true sum. | 
|  | // | 
|  | // Each updating thread uses a thread-local hashcode to determine its cacheline_aligned_int64 in the | 
|  | // hashtable. If a thread fails to CAS its hashed cacheline_aligned_int64, it will do a lightweight | 
|  | // rehash operation to try and find an uncontended bucket. Because the hashcode is thread-local, | 
|  | // this rehash affects all striped64's accessed by the thread. This is good, since contention on one | 
|  | // striped64 is indicative of contention elsewhere too. | 
|  | // | 
|  | // The hashtable is statically sized to the nearest power of 2 greater than or equal to the | 
|  | // number of CPUs. This is sufficient, since this guarantees the existence of a perfect hash | 
|  | // function. Due to the random rehashing, the threads should eventually converge to this function. | 
|  | // In practice, this scheme has shown to be sufficient. | 
|  | // | 
|  | // The biggest simplification of this implementation compared to JSR166e is that we do not | 
|  | // dynamically grow the table, instead immediately allocating it to the full size. | 
|  | // We also do not lazily allocate each cacheline_aligned_int64, instead allocating the entire array | 
|  | // at once. This means we waste some additional memory in low contention scenarios, and initial | 
|  | // allocation will also be slower. Some of the micro-optimizations were also elided for readability. | 
|  | class striped64 | 
|  | { | 
|  | public: | 
|  | striped64() = default; | 
|  |  | 
|  | protected: | 
|  | // NOTE: the destructor is not virtual so that we can ensure that striped64 | 
|  | // has no vtable, thus reducing its size. We make it protected to ensure that | 
|  | // no one attempts to delete a striped64* and invokes the wrong destructor. | 
|  | ~striped64() = default; | 
|  |  | 
|  | enum rehash | 
|  | { | 
|  | kRehash, | 
|  | kNoRehash | 
|  | }; | 
|  |  | 
|  | // CAS the base field. | 
|  | inline bool cas_base(int64_t cmp, int64_t val) { return _base.compare_exchange_weak(cmp, val); } | 
|  |  | 
|  | // Handles cases of updates involving initialization, resizing, creating new Cells, and/or | 
|  | // contention. See above for further explanation. | 
|  | // | 
|  | // 'Updater' should be a function which takes the current value and returns | 
|  | // the new value. | 
|  | template <class Updater> | 
|  | void retry_update(rehash to_rehash, Updater updater); | 
|  |  | 
|  | // Sets base and all cells to the given value. | 
|  | void internal_reset(int64_t initial_value); | 
|  |  | 
|  | // Base value, used mainly when there is no contention, but also as a fallback during | 
|  | // table initialization races. Updated via CAS. | 
|  | std::atomic<int64_t> _base{0}; | 
|  |  | 
|  | // Memory manager of cells. Once the destructor is called, cells will be freed. | 
|  | cacheline_aligned_int64_ptr _cells_holder; | 
|  |  | 
|  | // Table of cells. When non-null, size is the nearest power of 2 >= NCPU. | 
|  | // If this is set to -1, the pointer is 'locked' and some thread is in the | 
|  | // process of allocating the array. | 
|  | std::atomic<cacheline_aligned_int64 *> _cells{nullptr}; | 
|  |  | 
|  | static uint64_t get_tls_hashcode(); | 
|  |  | 
|  | private: | 
|  | DISALLOW_COPY_AND_ASSIGN(striped64); | 
|  |  | 
|  | // Static hash code per-thread. Shared across all instances to limit thread-local pollution. | 
|  | // Also, if a thread hits a collision on one striped64, it's also likely to collide on | 
|  | // other striped64s too. | 
|  | static __thread uint64_t _tls_hashcode; | 
|  | }; | 
|  |  | 
|  | // A 64-bit number optimized for high-volume concurrent updates. | 
|  | // See striped64 for a longer explanation of the inner workings. | 
|  | class striped_long_adder : striped64 | 
|  | { | 
|  | public: | 
|  | striped_long_adder() = default; | 
|  |  | 
|  | ~striped_long_adder() = default; | 
|  |  | 
|  | void increment_by(int64_t x); | 
|  |  | 
|  | // Returns the current value. | 
|  | // Note this is not an atomic snapshot in the presence of concurrent updates. | 
|  | int64_t value() const; | 
|  |  | 
|  | // Call reset() ONLY when necessary. | 
|  | inline void reset() { set(0); } | 
|  |  | 
|  | // Return the value immediately before it's reset. | 
|  | int64_t fetch_and_reset(); | 
|  |  | 
|  | private: | 
|  | // `set` is not exposed since it's not an efficient operation | 
|  | void set(int64_t val) { internal_reset(val); } | 
|  |  | 
|  | DISALLOW_COPY_AND_ASSIGN(striped_long_adder); | 
|  | }; | 
|  |  | 
|  | class concurrent_long_adder | 
|  | { | 
|  | public: | 
|  | concurrent_long_adder(); | 
|  | ~concurrent_long_adder() = default; | 
|  |  | 
|  | void increment_by(int64_t x); | 
|  |  | 
|  | // Returns the current value. | 
|  | // Note this is not an atomic snapshot in the presence of concurrent updates. | 
|  | int64_t value() const; | 
|  |  | 
|  | // Call reset() ONLY when necessary. | 
|  | inline void reset() { set(0); } | 
|  |  | 
|  | // Return the value immediately before it's reset. | 
|  | int64_t fetch_and_reset(); | 
|  |  | 
|  | private: | 
|  | // `set` is not exposed since it's not an efficient operation | 
|  | void set(int64_t val); | 
|  |  | 
|  | cacheline_aligned_int64_ptr _cells_holder; | 
|  | cacheline_aligned_int64 *_cells; | 
|  |  | 
|  | DISALLOW_COPY_AND_ASSIGN(concurrent_long_adder); | 
|  | }; | 
|  |  | 
|  | // Use template to wrap a long_adder implementation rather than inherit from a base class for | 
|  | // the reason that virtual function will increase the class size and slow down the execution. | 
|  | template <typename Adder> | 
|  | class long_adder_wrapper | 
|  | { | 
|  | public: | 
|  | long_adder_wrapper() = default; | 
|  |  | 
|  | ~long_adder_wrapper() = default; | 
|  |  | 
|  | inline void increment_by(int64_t x) { adder.increment_by(x); } | 
|  | inline void increment() { increment_by(1); } | 
|  | inline void decrement() { increment_by(-1); } | 
|  |  | 
|  | // Returns the current value. | 
|  | // Note this is not an atomic snapshot in the presence of concurrent updates. | 
|  | inline int64_t value() const { return adder.value(); } | 
|  |  | 
|  | // Resets the counter state to zero. Call it ONLY when necessary. | 
|  | inline void reset() { adder.reset(); } | 
|  |  | 
|  | // Return the value immediately before it's reset. | 
|  | inline int64_t fetch_and_reset() { return adder.fetch_and_reset(); } | 
|  |  | 
|  | private: | 
|  | Adder adder; | 
|  |  | 
|  | DISALLOW_COPY_AND_ASSIGN(long_adder_wrapper); | 
|  | }; | 
|  |  | 
|  | } // namespace dsn |