blob: d12ea9d22b0f63d05967c1d7d78cf38c38af3aed [file] [log] [blame]
// Copyright 2015 Cloudera, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "kudu/util/monotime.h"
#include "kudu/util/random.h"
#include "kudu/util/striped64.h"
#include "kudu/util/threadlocal.h"
using kudu::striped64::internal::HashCode;
using kudu::striped64::internal::Cell;
namespace kudu {
namespace striped64 {
namespace internal {
//
// HashCode
//
HashCode::HashCode() {
Random r(MonoTime::Now(MonoTime::FINE).GetDeltaSince(MonoTime::Min()).ToNanoseconds());
const uint64_t hash = r.Next64();
code_ = (hash == 0) ? 1 : hash; // Avoid zero to allow xorShift rehash
}
//
// Cell
//
Cell::Cell()
: value_(0) {
}
} // namespace internal
} // namespace striped64
//
// Striped64
//
const uint32_t Striped64::kNumCpus = sysconf(_SC_NPROCESSORS_ONLN);
DEFINE_STATIC_THREAD_LOCAL(HashCode, Striped64, hashcode_);
Striped64::Striped64()
: busy_(false),
cell_buffer_(NULL),
cells_(NULL),
num_cells_(0) {
}
Striped64::~Striped64() {
// Cell is a POD, so no need to destruct each one.
free(cell_buffer_);
}
void Striped64::RetryUpdate(int64_t x, Rehash contention) {
uint64_t h = hashcode_->code_;
// There are three operations in this loop.
//
// 1. Try to add to the Cell hash table entry for the thread if the table exists.
// When there's contention, rehash to try a different Cell.
// 2. Try to initialize the hash table.
// 3. Try to update the base counter.
//
// These are predicated on successful CAS operations, which is why it's all wrapped in an
// infinite retry loop.
while (true) {
int32_t n = base::subtle::Acquire_Load(&num_cells_);
if (n > 0) {
if (contention == kRehash) {
// CAS failed already, rehash before trying to increment.
contention = kNoRehash;
} else {
Cell *cell = &(cells_[(n - 1) & h]);
int64_t v = cell->value_.Load();
if (cell->CompareAndSet(v, Fn(v, x))) {
// Successfully CAS'd the corresponding cell, done.
break;
}
}
// Rehash since we failed to CAS, either previously or just now.
h ^= h << 13;
h ^= h >> 17;
h ^= h << 5;
} else if (n == 0 && CasBusy()) {
// We think table hasn't been initialized yet, try to do so.
// Recheck preconditions, someone else might have init'd in the meantime.
n = base::subtle::Acquire_Load(&num_cells_);
if (n == 0) {
n = 1;
// Calculate the size. Nearest power of two >= NCPU.
// Also handle a negative NCPU, can happen if sysconf name is unknown
while (kNumCpus > n) {
n <<= 1;
}
// Allocate cache-aligned memory for use by the cells_ table.
int err = posix_memalign(&cell_buffer_, CACHELINE_SIZE, sizeof(Cell)*n);
CHECK_EQ(0, err) << "error calling posix_memalign" << std::endl;
// Initialize the table
cells_ = new (cell_buffer_) Cell[n];
base::subtle::Release_Store(&num_cells_, n);
}
// End critical section
busy_.Store(0);
} else {
// Fallback to adding to the base value.
// Means the table wasn't initialized or we failed to init it.
int64_t v = base_.value_.Load();
if (CasBase(v, Fn(v, x))) {
break;
}
}
}
// Record index for next time
hashcode_->code_ = h;
}
void Striped64::InternalReset(int64_t initialValue) {
const int32_t n = base::subtle::Acquire_Load(&num_cells_);
base_.value_.Store(initialValue);
for (int i = 0; i < n; i++) {
cells_[i].value_.Store(initialValue);
}
}
void LongAdder::IncrementBy(int64_t x) {
INIT_STATIC_THREAD_LOCAL(HashCode, hashcode_);
// Use hash table if present. If that fails, call RetryUpdate to rehash and retry.
// If no hash table, try to CAS the base counter. If that fails, RetryUpdate to init the table.
const int32_t n = base::subtle::Acquire_Load(&num_cells_);
if (n > 0) {
Cell *cell = &(cells_[(n - 1) & hashcode_->code_]);
DCHECK_EQ(0, reinterpret_cast<const uintptr_t>(cell) & (sizeof(Cell) - 1))
<< " unaligned Cell not allowed for Striped64" << std::endl;
const int64_t old = cell->value_.Load();
if (!cell->CompareAndSet(old, old + x)) {
// When we hit a hash table contention, signal RetryUpdate to rehash.
RetryUpdate(x, kRehash);
}
} else {
int64_t b = base_.value_.Load();
if (!base_.CompareAndSet(b, b + x)) {
// Attempt to initialize the table. No need to rehash since the contention was for the
// base counter, not the hash table.
RetryUpdate(x, kNoRehash);
}
}
}
//
// LongAdder
//
int64_t LongAdder::Value() const {
int64_t sum = base_.value_.Load();
const int32_t n = base::subtle::Acquire_Load(&num_cells_);
for (int i = 0; i < n; i++) {
sum += cells_[i].value_.Load();
}
return sum;
}
} // namespace kudu