blob: 6bc15d87cb68cd540710a28ba33cf9bd8e11ae04 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <deque>
#include <map>
#include <memory>
#include <mutex>
#include <thread>
#include <vector>
#include "common/compiler_util.h"
#include "gutil/macros.h"
namespace doris {
class CoreDataAllocator {
public:
virtual ~CoreDataAllocator() { }
virtual void* get_or_create(size_t id) = 0;
};
class CoreDataAllocatorFactory {
public:
CoreDataAllocatorFactory() { }
~CoreDataAllocatorFactory();
CoreDataAllocator* get_allocator(size_t cpu_id, size_t data_bytes);
static CoreDataAllocatorFactory* instance();
private:
DISALLOW_COPY_AND_ASSIGN(CoreDataAllocatorFactory);
private:
std::mutex _lock;
std::map<std::pair<size_t, size_t>, CoreDataAllocator*> _allocators;
};
template<typename T>
class CoreLocalValueController {
public:
CoreLocalValueController() {
int num_cpus = static_cast<int>(std::thread::hardware_concurrency());
_size = 8;
while (_size < num_cpus) {
_size <<= 1;
}
_allocators.resize(_size, nullptr);
for (int i = 0; i < _size; ++i) {
_allocators[i] = CoreDataAllocatorFactory::instance()->get_allocator(i, sizeof(T));
}
}
~CoreLocalValueController() { }
int get_id() {
std::lock_guard<std::mutex> l(_lock);
int id = 0;
if (_free_ids.empty()) {
id = _next_id++;
} else {
id = _free_ids.back();
_free_ids.pop_back();
}
return id;
}
void reclaim_id(int id) {
std::lock_guard<std::mutex> l(_lock);
_free_ids.push_back(id);
}
size_t size() const { return _size; }
CoreDataAllocator* allocator(int i) const {
return _allocators[i];
}
static CoreLocalValueController<T>* instance() {
static CoreLocalValueController<T> _s_instance;
return &_s_instance;
}
private:
DISALLOW_COPY_AND_ASSIGN(CoreLocalValueController);
private:
std::mutex _lock;
int _next_id = 0;
std::deque<int> _free_ids;
std::vector<CoreDataAllocator*> _allocators;
size_t _size;
};
template<typename T>
class CoreLocalValue {
public:
CoreLocalValue(const T init_value = T()) {
CoreLocalValueController<T>* controller = CoreLocalValueController<T>::instance();
_id = controller->get_id();
_size = controller->size();
_values.resize(_size, nullptr);
for (int i = 0; i < _size; ++i) {
void* ptr = controller->allocator(i)->get_or_create(_id);
_values[i] = new (ptr) T(init_value);
}
}
~CoreLocalValue() {
for (int i = 0; i < _size; ++i) {
_values[i]->~T();
}
CoreLocalValueController<T>::instance()->reclaim_id(_id);
}
inline size_t size() const { return _size; }
inline T* access() const {
size_t cpu_id = sched_getcpu();
if (cpu_id >= _size) {
cpu_id &= _size - 1;
}
return access_at_core(cpu_id);
}
inline T* access_at_core(size_t core_idx) const {
return _values[core_idx];
}
private:
int _id = -1;
size_t _size = 0;
std::vector<T*> _values;
};
}