blob: 85d5d08521621a2973de59e051c1671a77b3a82b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/SpaceSaving.h.h
// and modified by Doris
#pragma once
#include <boost/range/adaptor/reversed.hpp>
#include "vec/common/arena_with_free_lists.h"
#include "vec/common/hash_table/hash_map.h"
#include "vec/common/string_buffer.hpp"
#include "vec/io/io_helper.h"
namespace doris::vectorized {
template <typename TKey>
struct SpaceSavingArena {
SpaceSavingArena() = default;
TKey emplace(const TKey& key) { return key; }
void free(const TKey& /*key*/) {}
};
template <>
struct SpaceSavingArena<StringRef> {
StringRef emplace(StringRef key) {
if (!key.data) {
return key;
}
return copy_string_in_arena(arena, key);
}
void free(StringRef key) {
if (key.data) {
arena.free(const_cast<char*>(key.data), key.size);
}
}
template <typename Arena>
inline StringRef copy_string_in_arena(Arena& arena, StringRef value) {
size_t value_size = value.size;
char* place_for_key = arena.alloc(value_size);
memcpy(reinterpret_cast<void*>(place_for_key), reinterpret_cast<const void*>(value.data),
value_size);
StringRef result {place_for_key, value_size};
return result;
}
private:
ArenaWithFreeLists arena;
};
template <typename TKey, typename Hash = DefaultHash<TKey>>
class SpaceSaving {
private:
// This function calculates the next size for the alpha map based on the given value `x`.
// It uses `alpha_map_elements_per_counter` to estimate the required number of elements.
constexpr uint64_t next_alpha_size(uint64_t x) {
constexpr uint64_t alpha_map_elements_per_counter = 6;
return 1ULL << (sizeof(uint64_t) * 8 -
std::countl_zero(x * alpha_map_elements_per_counter));
}
public:
using Self = SpaceSaving;
struct Counter {
Counter() = default;
explicit Counter(const TKey& k, uint64_t c = 0, uint64_t e = 0, size_t h = 0)
: key(k), hash(h), count(c), error(e) {}
void write(BufferWritable& wb) const {
wb.write_binary(key);
wb.write_var_uint(count);
wb.write_var_uint(error);
}
void read(BufferReadable& rb) {
rb.read_binary(key);
rb.read_var_uint(count);
rb.read_var_uint(error);
}
bool operator>(const Counter& b) const {
return (count > b.count) || (count == b.count && error < b.error);
}
TKey key;
size_t slot = 0;
size_t hash = 0;
uint64_t count = 0;
uint64_t error = 0;
};
explicit SpaceSaving(size_t c = 10) : alpha_map(next_alpha_size(c)), m_capacity(c) {}
~SpaceSaving() { destroy_elements(); }
size_t size() const { return counter_list.size(); }
size_t capacity() const { return m_capacity; }
void clear() { return destroy_elements(); }
void resize(size_t new_capacity) {
counter_list.reserve(new_capacity);
alpha_map.resize(next_alpha_size(new_capacity));
m_capacity = new_capacity;
}
// Inserts a new element or updates the count of an existing element.
// If the element exists, the count and error are increased.
// If the element doesn't exist and the capacity is not full, it inserts the new element.
// If the capacity is full, it replaces the element with the smallest count and inserts the new one.
void insert(const TKey& key, uint64_t increment = 1, uint64_t error = 0) {
auto hash = counter_map.hash(key);
if (auto* counter = find_counter(key, hash); counter) {
counter->count += increment;
counter->error += error;
percolate(counter);
return;
}
if (UNLIKELY(size() < capacity())) {
push(std::make_unique<Counter>(arena.emplace(key), increment, error, hash));
return;
}
auto& min = counter_list.back();
if (increment > min->count) {
destroy_last_element();
push(std::make_unique<Counter>(arena.emplace(key), increment, error, hash));
return;
}
const size_t alpha_mask = alpha_map.size() - 1;
auto& alpha = alpha_map[hash & alpha_mask];
if (alpha + increment < min->count) {
alpha += increment;
return;
}
alpha_map[min->hash & alpha_mask] = min->count;
destroy_last_element();
push(std::make_unique<Counter>(arena.emplace(key), alpha + increment, alpha + error, hash));
}
// Merges another `SpaceSaving` object into the current one. Updates counts and errors of elements.
// If the other object is full, it adds its elements to the current list and maintains sorting.
void merge(const Self& rhs) {
if (!rhs.size()) {
return;
}
uint64_t m1 = 0;
uint64_t m2 = 0;
if (size() == capacity()) {
m1 = counter_list.back()->count;
}
if (rhs.size() == rhs.capacity()) {
m2 = rhs.counter_list.back()->count;
}
if (m2 > 0) {
for (auto& counter : counter_list) {
counter->count += m2;
counter->error += m2;
}
}
for (auto& counter : boost::adaptors::reverse(rhs.counter_list)) {
size_t hash = counter_map.hash(counter->key);
if (auto* current = find_counter(counter->key, hash)) {
current->count += (counter->count - m2);
current->error += (counter->error - m2);
} else {
counter_list.push_back(std::make_unique<Counter>(arena.emplace(counter->key),
counter->count + m1,
counter->error + m1, hash));
}
}
std::sort(counter_list.begin(), counter_list.end(),
[](const auto& l, const auto& r) { return *l > *r; });
if (counter_list.size() > m_capacity) {
for (size_t i = m_capacity; i < counter_list.size(); ++i) {
arena.free(counter_list[i]->key);
}
counter_list.resize(m_capacity);
}
for (size_t i = 0; i < counter_list.size(); ++i) {
counter_list[i]->slot = i;
}
rebuild_counter_map();
}
// Retrieves the top-k counters, sorted by their count and error values.
std::vector<Counter> top_k(size_t k) const {
std::vector<Counter> res;
for (auto& counter : counter_list) {
res.push_back(*counter);
if (res.size() == k) {
break;
}
}
return res;
}
void write(BufferWritable& wb) const {
wb.write_var_uint(size());
for (auto& counter : counter_list) {
counter->write(wb);
}
wb.write_var_uint(alpha_map.size());
for (auto alpha : alpha_map) {
wb.write_var_uint(alpha);
}
}
void read(BufferReadable& rb) {
destroy_elements();
uint64_t count = 0;
rb.read_var_uint(count);
for (UInt64 i = 0; i < count; ++i) {
std::unique_ptr counter = std::make_unique<Counter>();
counter->read(rb);
counter->hash = counter_map.hash(counter->key);
push(std::move(counter));
}
read_alpha_map(rb);
}
// Reads the alpha map data from the provided readable buffer.
void read_alpha_map(BufferReadable& rb) {
uint64_t alpha_size = 0;
rb.read_var_uint(alpha_size);
for (size_t i = 0; i < alpha_size; ++i) {
uint64_t alpha = 0;
rb.read_var_uint(alpha);
alpha_map.push_back(alpha);
}
}
protected:
void push(std::unique_ptr<Counter> counter) {
counter->slot = counter_list.size();
auto* ptr = counter.get();
counter_list.push_back(std::move(counter));
counter_map[ptr->key] = ptr;
percolate(ptr);
}
void percolate(Counter* counter) {
while (counter->slot > 0) {
auto& next = counter_list[counter->slot - 1];
if (*counter > *next) {
std::swap(next->slot, counter->slot);
std::swap(counter_list[next->slot], counter_list[counter->slot]);
} else {
break;
}
}
}
private:
void destroy_elements() {
for (auto& counter : counter_list) {
arena.free(counter->key);
}
counter_map.clear();
counter_list.clear();
alpha_map.clear();
}
void destroy_last_element() {
auto& last_element = counter_list.back();
counter_map.erase(last_element->key);
arena.free(last_element->key);
counter_list.pop_back();
++removed_keys;
if (removed_keys * 2 > counter_map.size()) {
rebuild_counter_map();
}
}
Counter* find_counter(const TKey& key, size_t hash) {
auto it = counter_map.find(key, hash);
if (it == counter_map.end()) {
return nullptr;
}
return it->second;
}
void rebuild_counter_map() {
removed_keys = 0;
counter_map.clear();
for (auto& counter : counter_list) {
counter_map[counter->key] = counter.get();
}
}
using CounterMap = flat_hash_map<TKey, Counter*, Hash>;
CounterMap counter_map;
std::vector<std::unique_ptr<Counter>> counter_list;
std::vector<uint64_t> alpha_map;
SpaceSavingArena<TKey> arena;
size_t m_capacity = 0;
size_t removed_keys = 0;
};
} // namespace doris::vectorized