| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <rapidjson/document.h> |
| #include <rapidjson/prettywriter.h> |
| #include <rapidjson/stringbuffer.h> |
| |
| #include <boost/dynamic_bitset.hpp> |
| |
| #include "common/cast_set.h" |
| #include "vec/data_types/data_type_decimal.h" |
| #include "vec/io/io_helper.h" |
| |
| namespace doris::vectorized { |
| #include "common/compile_check_begin.h" |
| template <typename T> |
| struct Bucket { |
| public: |
| Bucket() = default; |
| Bucket(T lower, T upper, size_t ndv, size_t count, size_t pre_sum) |
| : lower(lower), upper(upper), ndv(ndv), count(count), pre_sum(pre_sum) {} |
| |
| T lower; |
| T upper; |
| size_t ndv; |
| size_t count; |
| size_t pre_sum; |
| }; |
| |
| /** |
| * Checks if it is possible to assign the provided value_map to the given |
| * number of buckets such that no bucket has a size larger than max_bucket_size. |
| * |
| * @param value_map A mapping of values to their counts. |
| * @param max_bucket_size The maximum size that any bucket is allowed to have. |
| * @param num_buckets The number of buckets that we want to assign values to. |
| * |
| * @return true if the values can be assigned to the buckets, false otherwise. |
| */ |
| template <typename T> |
| bool can_assign_into_buckets(const std::map<T, size_t>& value_map, const size_t max_bucket_size, |
| const size_t num_buckets) { |
| if (value_map.empty()) { |
| return false; |
| }; |
| |
| size_t used_buckets = 1; |
| size_t current_bucket_size = 0; |
| |
| for (const auto& [value, count] : value_map) { |
| current_bucket_size += count; |
| |
| // If adding the current value to the current bucket would exceed max_bucket_size, |
| // then we start a new bucket. |
| if (current_bucket_size > max_bucket_size) { |
| ++used_buckets; |
| current_bucket_size = count; |
| } |
| |
| // If we have used more buckets than num_buckets, we cannot assign the values to buckets. |
| if (used_buckets > num_buckets) { |
| return false; |
| } |
| } |
| |
| return true; |
| } |
| |
| /** |
| * Calculates the maximum number of values that can fit into each bucket given a set of values |
| * and the desired number of buckets. |
| * |
| * @tparam T the type of the values in the value map |
| * @param value_map the map of values and their counts |
| * @param num_buckets the desired number of buckets |
| * @return the maximum number of values that can fit into each bucket |
| */ |
| template <typename T> |
| size_t calculate_bucket_max_values(const std::map<T, size_t>& value_map, const size_t num_buckets) { |
| // Ensure that the value map is not empty |
| assert(!value_map.empty()); |
| |
| // Calculate the total number of values in the map using std::accumulate() |
| size_t total_values = 0; |
| for (const auto& [value, count] : value_map) { |
| total_values += count; |
| } |
| |
| // If there is only one bucket, then all values will be assigned to that bucket |
| if (num_buckets == 1) { |
| return total_values; |
| } |
| |
| // To calculate the maximum value count in each bucket, we first calculate a conservative upper |
| // bound, which is equal to 2 * total_values / (max_buckets - 1) + 1. This upper bound may exceed |
| // the actual maximum value count, but it does not underestimate it. The subsequent binary search |
| // algorithm will approach the actual maximum value count. |
| size_t upper_bucket_values = 2 * total_values / (num_buckets - 1) + 1; |
| |
| // Initialize the lower bound to 0 |
| size_t lower_bucket_values = 0; |
| |
| // Perform a binary search to find the maximum number of values that can fit into each bucket |
| int search_step = 0; |
| const int max_search_steps = |
| 10; // Limit the number of search steps to avoid excessive iteration |
| |
| while (upper_bucket_values > lower_bucket_values + 1 && search_step < max_search_steps) { |
| // Calculate the midpoint of the upper and lower bounds |
| const size_t bucket_values = (upper_bucket_values + lower_bucket_values) / 2; |
| |
| // Check if the given number of values can be assigned to the desired number of buckets |
| if (can_assign_into_buckets(value_map, bucket_values, num_buckets)) { |
| // If it can, then set the upper bound to the midpoint |
| upper_bucket_values = bucket_values; |
| } else { |
| // If it can't, then set the lower bound to the midpoint |
| lower_bucket_values = bucket_values; |
| } |
| // Increment the search step counter |
| ++search_step; |
| } |
| |
| return upper_bucket_values; |
| } |
| |
| /** |
| * Greedy equi-height histogram construction algorithm, inspired by the MySQL |
| * equi_height implementation(https://dev.mysql.com/doc/dev/mysql-server/latest/equi__height_8h.html). |
| * |
| * Given an ordered collection of [value, count] pairs and a maximum bucket |
| * size, construct a histogram by inserting values into a bucket while keeping |
| * track of its size. If the insertion of a value into a non-empty bucket |
| * causes the bucket to exceed the maximum size, create a new empty bucket and |
| * continue. |
| * |
| * The algorithm guarantees a selectivity estimation error of at most ~2 * |
| * #values / #buckets, often less. Values with a higher relative frequency are |
| * guaranteed to be placed in singleton buckets. |
| * |
| * The minimum composite bucket size is used to minimize the worst case |
| * selectivity estimation error. In general, the algorithm will adapt to the |
| * data distribution to minimize the size of composite buckets. The heavy values |
| * can be placed in singleton buckets and the remaining values will be evenly |
| * spread across the remaining buckets, leading to a lower composite bucket size. |
| * |
| * Note: The term "value" refers to an entry in a column and the actual value |
| * of an entry. The ordered_map is an ordered collection of [distinct value, |
| * value count] pairs. For example, a Value_map<String> could contain the pairs ["a", 1], ["b", 2] |
| * to represent one "a" value and two "b" values. |
| * |
| * @param buckets A vector of empty buckets that will be populated with data. |
| * @param ordered_map An ordered map of distinct values and their counts. |
| * @param max_num_buckets The maximum number of buckets that can be used. |
| * |
| * @return True if the buckets were successfully built, false otherwise. |
| */ |
| template <typename T> |
| bool build_histogram(std::vector<Bucket<T>>& buckets, const std::map<T, size_t>& ordered_map, |
| const size_t max_num_buckets) { |
| // If the input map is empty, there is nothing to build. |
| if (ordered_map.empty()) { |
| return false; |
| } |
| |
| // Calculate the maximum number of values that can be assigned to each bucket. |
| auto bucket_max_values = calculate_bucket_max_values(ordered_map, max_num_buckets); |
| |
| // Ensure that the capacity is at least max_num_buckets in order to avoid the overhead of additional |
| // allocations when inserting buckets. |
| buckets.clear(); |
| buckets.reserve(max_num_buckets); |
| |
| // Initialize bucket variables. |
| size_t distinct_values_count = 0; |
| size_t values_count = 0; |
| size_t cumulative_values = 0; |
| |
| // Record how many values still need to be assigned. |
| auto remaining_distinct_values = ordered_map.size(); |
| |
| auto it = ordered_map.begin(); |
| |
| // Lower value of the current bucket. |
| const T* lower_value = &it->first; |
| |
| // Iterate over the ordered map of distinct values and their counts. |
| for (; it != ordered_map.end(); ++it) { |
| const auto count = it->second; |
| const auto current_value = it->first; |
| |
| // Update the bucket counts and track the number of distinct values assigned. |
| distinct_values_count++; |
| remaining_distinct_values--; |
| values_count += count; |
| cumulative_values += count; |
| |
| // Check whether the current value should be added to the current bucket. |
| auto next = std::next(it); |
| size_t remaining_empty_buckets = max_num_buckets - buckets.size() - 1; |
| |
| if (next != ordered_map.end() && remaining_distinct_values > remaining_empty_buckets && |
| values_count + next->second <= bucket_max_values) { |
| // If the current value is the last in the input map and there are more remaining |
| // distinct values than empty buckets and adding the value does not cause the bucket |
| // to exceed its max size, skip adding the value to the current bucket. |
| continue; |
| } |
| |
| // Finalize the current bucket and add it to our collection of buckets. |
| auto pre_sum = cumulative_values - values_count; |
| |
| Bucket<T> new_bucket(*lower_value, current_value, distinct_values_count, values_count, |
| pre_sum); |
| buckets.push_back(new_bucket); |
| |
| // Reset variables for the next bucket. |
| if (next != ordered_map.end()) { |
| lower_value = &next->first; |
| } |
| values_count = 0; |
| distinct_values_count = 0; |
| } |
| |
| return true; |
| } |
| |
| template <typename T> |
| bool histogram_to_json(rapidjson::StringBuffer& buffer, const std::vector<Bucket<T>>& buckets, |
| const DataTypePtr& data_type) { |
| rapidjson::Document doc; |
| doc.SetObject(); |
| rapidjson::Document::AllocatorType& allocator = doc.GetAllocator(); |
| |
| int num_buckets = cast_set<int>(buckets.size()); |
| doc.AddMember("num_buckets", num_buckets, allocator); |
| |
| rapidjson::Value bucket_arr(rapidjson::kArrayType); |
| bucket_arr.Reserve(num_buckets, allocator); |
| |
| std::stringstream ss1; |
| std::stringstream ss2; |
| |
| rapidjson::Value lower_val; |
| rapidjson::Value upper_val; |
| |
| // Convert bucket's lower and upper to 2 columns |
| MutableColumnPtr lower_column = data_type->create_column(); |
| MutableColumnPtr upper_column = data_type->create_column(); |
| for (const auto& bucket : buckets) { |
| // String type is different, it has to pass in length |
| if constexpr (std::is_same_v<T, std::string>) { |
| lower_column->insert_data(bucket.lower.c_str(), bucket.lower.length()); |
| upper_column->insert_data(bucket.upper.c_str(), bucket.upper.length()); |
| } else { |
| lower_column->insert_data(reinterpret_cast<const char*>(&bucket.lower), 0); |
| upper_column->insert_data(reinterpret_cast<const char*>(&bucket.upper), 0); |
| } |
| } |
| size_t row_num = 0; |
| for (const auto& bucket : buckets) { |
| std::string lower_str = data_type->to_string(*lower_column, row_num); |
| std::string upper_str = data_type->to_string(*upper_column, row_num); |
| ++row_num; |
| lower_val.SetString(lower_str.data(), static_cast<rapidjson::SizeType>(lower_str.size()), |
| allocator); |
| upper_val.SetString(upper_str.data(), static_cast<rapidjson::SizeType>(upper_str.size()), |
| allocator); |
| |
| rapidjson::Value bucket_json(rapidjson::kObjectType); |
| bucket_json.AddMember("lower", lower_val, allocator); |
| bucket_json.AddMember("upper", upper_val, allocator); |
| bucket_json.AddMember("ndv", static_cast<int64_t>(bucket.ndv), allocator); |
| bucket_json.AddMember("count", static_cast<int64_t>(bucket.count), allocator); |
| bucket_json.AddMember("pre_sum", static_cast<int64_t>(bucket.pre_sum), allocator); |
| |
| bucket_arr.PushBack(bucket_json, allocator); |
| } |
| |
| doc.AddMember("buckets", bucket_arr, allocator); |
| rapidjson::Writer<rapidjson::StringBuffer> writer(buffer); |
| doc.Accept(writer); |
| |
| return !buckets.empty() && buffer.GetSize() > 0; |
| } |
| #include "common/compile_check_end.h" |
| } // namespace doris::vectorized |