blob: a7ffec709c58fd1203c26116d53cfd363fd8a0ac [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef KUDU_UTIL_STRIPED64_H_
#define KUDU_UTIL_STRIPED64_H_
#include "kudu/gutil/port.h"
#include "kudu/util/atomic.h"
#include "kudu/util/threadlocal.h"
namespace kudu {
class Striped64;
namespace striped64 {
namespace internal {
struct HashCode {
public:
HashCode();
uint64_t code_;
};
#define ATOMIC_INT_SIZE sizeof(AtomicInt<int64_t>)
// Padded POD container for AtomicInt. This prevents false sharing of cache lines.
class Cell {
public:
Cell();
inline bool CompareAndSet(int64_t cmp, int64_t value) {
return value_.CompareAndSet(cmp, value);
}
// Padding advice from Herb Sutter:
// http://www.drdobbs.com/parallel/eliminate-false-sharing/217500206?pgno=4
AtomicInt<int64_t> value_;
char pad[CACHELINE_SIZE > ATOMIC_INT_SIZE ?
CACHELINE_SIZE - ATOMIC_INT_SIZE : 1];
DISALLOW_COPY_AND_ASSIGN(Cell);
} CACHELINE_ALIGNED;
#undef ATOMIC_INT_SIZE
} // namespace internal
} // namespace striped64
// This set of classes is heavily derived from JSR166e, released into the public domain
// by Doug Lea and the other authors.
//
// See: http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/Striped64.java?view=co
// See: http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/LongAdder.java?view=co
//
// The Striped64 and LongAdder implementations here are simplified versions of what's present in
// JSR166e. However, the core ideas remain the same.
//
// Updating a single AtomicInteger in a multi-threaded environment can be quite slow:
//
// 1. False sharing of cache lines with other counters.
// 2. Cache line bouncing from high update rates, especially with many cores.
//
// These two problems are addressed by Striped64. When there is no contention, it uses CAS on a
// single base counter to store updates. However, when Striped64 detects contention
// (via a failed CAS operation), it will allocate a small, fixed size hashtable of Cells.
// A Cell is a simple POD that pads out an AtomicInt to 64 bytes to prevent
// sharing a cache line.
//
// Reading the value of a Striped64 requires traversing the hashtable to calculate the true sum.
//
// Each updating thread uses a thread-local hashcode to determine its Cell in the hashtable.
// If a thread fails to CAS its hashed Cell, it will do a lightweight rehash operation to try
// and find an uncontended bucket. Because the hashcode is thread-local, this rehash affects all
// Striped64's accessed by the thread. This is good, since contention on one Striped64 is
// indicative of contention elsewhere too.
//
// The hashtable is statically sized to the nearest power of 2 greater than or equal to the
// number of CPUs. This is sufficient, since this guarantees the existence of a perfect hash
// function. Due to the random rehashing, the threads should eventually converge to this function.
// In practice, this scheme has shown to be sufficient.
//
// The biggest simplification of this implementation compared to JSR166e is that we do not
// dynamically grow the table, instead immediately allocating it to the full size.
// We also do not lazily allocate each Cell, instead allocating the entire array at once.
// This means we waste some additional memory in low contention scenarios, and initial allocation
// will also be slower. Some of the micro-optimizations were also elided for readability.
class Striped64 {
public:
Striped64();
virtual ~Striped64();
protected:
enum Rehash {
kRehash,
kNoRehash
};
// CAS the base field.
bool CasBase(int64_t cmp, int64_t val) { return base_.CompareAndSet(cmp, val); }
// CAS the busy field from 0 to 1 to acquire the lock.
bool CasBusy() { return busy_.CompareAndSet(0, 1); }
// Computes the function of the current and new value. Used in RetryUpdate.
virtual int64_t Fn(int64_t current_value, int64_t new_value) = 0;
// Handles cases of updates involving initialization, resizing, creating new Cells, and/or
// contention. See above for further explanation.
void RetryUpdate(int64_t x, Rehash to_rehash);
// Sets base and all cells to the given value.
void InternalReset(int64_t initialValue);
// Base value, used mainly when there is no contention, but also as a fallback during
// table initialization races. Updated via CAS.
striped64::internal::Cell base_;
// CAS lock used when resizing and/or creating cells.
AtomicBool busy_;
// Backing buffer for cells_, used for alignment.
void* cell_buffer_;
// Table of cells. When non-null, size is the nearest power of 2 >= NCPU.
striped64::internal::Cell* cells_;
int32_t num_cells_;
// Static hash code per-thread. Shared across all instances to limit thread-local pollution.
// Also, if a thread hits a collision on one Striped64, it's also likely to collide on
// other Striped64s too.
DECLARE_STATIC_THREAD_LOCAL(striped64::internal::HashCode, hashcode_);
private:
// Number of CPUs, to place bound on table size.
static const uint32_t kNumCpus;
};
// A 64-bit number optimized for high-volume concurrent updates.
// See Striped64 for a longer explanation of the inner workings.
class LongAdder : Striped64 {
public:
LongAdder() {}
void IncrementBy(int64_t x);
void Increment() { IncrementBy(1); }
void Decrement() { IncrementBy(-1); }
// Returns the current value.
// Note this is not an atomic snapshot in the presence of concurrent updates.
int64_t Value() const;
// Resets the counter state to zero.
void Reset() { InternalReset(0); }
private:
int64_t Fn(int64_t current_value, int64_t new_value) override {
return current_value + new_value;
}
DISALLOW_COPY_AND_ASSIGN(LongAdder);
};
} // namespace kudu
#endif