blob: 1f7625bda0228dbcd7598590701dc1d87559e3a7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef ENCODING_ACLUSTER_ENCODER_H
#define ENCODING_ACLUSTER_ENCODER_H
#include <vector>
#include <string>
#include <memory>
#include <cmath>
#include <numeric>
#include <algorithm>
#include <stdexcept>
#include <limits>
#include <unordered_set>
#include <random>
#include <sstream>
#include <functional>
#include <cstdint> // For uint8_t, size_t, etc.
#include "encoder.h"
#include "common/allocator/byte_stream.h"
#include "common/global.h"
// ===================================================================
// Part 1: Core ACluster Algorithm Logic
// ===================================================================
namespace AClusterLogic {
// Corresponds to: algorithms/cluster_common.h
template<typename T>
struct SoAData {
std::vector<std::vector<T>> columns;
int dim = 0;
int num_rows = 0;
};
// Corresponds to: algorithms/cluster_common.h
struct ClusteringResult {
std::vector<std::vector<long long>> medoids;
std::vector<int> cluster_assignment;
std::vector<long long> cluster_sizes;
};
// Corresponds to: algorithms/cluster_common.h
struct PreprocessorResult {
std::unique_ptr<SoAData<long long>> data;
std::vector<int> max_decimal_places;
std::vector<long long> min_values;
};
// Corresponds to: cluster_encoder_logic.cpp
struct BitBuffer {
std::vector<uint8_t> buffer;
size_t currentBitPos = 0;
inline void appendBit(bool bit) {
size_t byteIndex = currentBitPos / 8;
int bitIndexInByte = currentBitPos % 8;
if (byteIndex >= buffer.size()) buffer.push_back(0);
if (bit) buffer[byteIndex] |= (1 << (7 - bitIndexInByte));
currentBitPos++;
}
inline void merge(const BitBuffer& other) {
for (size_t i = 0; i < other.currentBitPos; ++i) {
size_t byteIndex = i / 8;
int bitOffset = 7 - (i % 8);
bool bit = (other.buffer[byteIndex] & (1 << bitOffset)) != 0;
appendBit(bit);
}
}
inline std::vector<uint8_t> toByteArray() const { return buffer; }
inline size_t size() const { return currentBitPos; }
};
namespace BitBufferUtils {
// Corresponds to: cluster_encoder_logic.cpp :: BitBufferUtils
inline void appendToBitstream(BitBuffer& bitBuffer, long long num, int bits) {
for (int i = bits - 1; i >= 0; --i) bitBuffer.appendBit((num >> i) & 1);
}
// Corresponds to: cluster_encoder_logic.cpp :: BitBufferUtils
inline int bitsRequiredNoSign(long long value) {
if (value == 0) return 1;
unsigned long long u_value = (value > 0) ? value : -value;
#if defined(__GNUC__) || defined(__clang__)
return 64 - __builtin_clzll(u_value);
#else
int length = 0; while(u_value > 0){ u_value >>= 1; length++; } return length == 0 ? 1 : length;
#endif
}
}
namespace { // Anonymous namespace for internal linkage helpers
// Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for Preprocessor
inline int bitLength(long long value) {
if (value == 0) return 1;
unsigned long long u_value = (value > 0) ? value : -value;
#if defined(__GNUC__) || defined(__clang__)
return 64 - __builtin_clzll(u_value);
#else
int length = 0; while (u_value > 0) { u_value >>= 1; length++; } return length;
#endif
}
// Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for clustering
inline long long calculateTotalLogCost(const std::vector<long long>& p1, const std::vector<long long>& p2, int dim) {
long long totalCost = 0;
for (int i = 0; i < dim; ++i) {
long long residual = p1[i] - p2[i];
totalCost += bitLength(residual >= 0 ? residual : -residual) + 1;
}
return totalCost;
}
// Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for clustering
inline long long calculateBasePointStorageCost(const std::vector<long long>& basePoint, int dim) {
long long storageCost = 0;
for (int i = 0; i < dim; ++i) {
storageCost += bitLength(basePoint[i] >= 0 ? basePoint[i] : -basePoint[i]) + 1;
}
return storageCost;
}
// Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for clustering
inline std::vector<long long> get_point_from_soa(const SoAData<long long>& soa_data, int row_idx) {
std::vector<long long> point(soa_data.dim);
for (int d = 0; d < soa_data.dim; ++d) {
point[d] = soa_data.columns[d][row_idx];
}
return point;
}
// Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for clustering
inline std::vector<std::vector<long long>> convert_SoA_to_AoS(const SoAData<long long>& soa_data) {
if (soa_data.num_rows == 0) return {};
std::vector<std::vector<long long>> aos_data(soa_data.num_rows, std::vector<long long>(soa_data.dim));
for (int j = 0; j < soa_data.dim; ++j) {
for (int i = 0; i < soa_data.num_rows; ++i) {
aos_data[i][j] = soa_data.columns[j][i];
}
}
return aos_data;
}
// Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for clustering
struct VectorHasher {
inline std::size_t operator()(const std::vector<long long>& vec) const {
std::size_t seed = vec.size();
for(long long i : vec) { seed ^= std::hash<long long>()(i) + 0x9e3779b9 + (seed << 6) + (seed >> 2); }
return seed;
}
};
// Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for clustering
struct MedSortHelper {
std::vector<long long> medoid;
long long size;
int original_index;
inline bool operator<(const MedSortHelper& other) const { return size < other.size; }
};
// Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for clustering
inline ClusteringResult sortResults(std::vector<std::vector<long long>>& medoids, std::vector<int>& assignment, std::vector<long long>& sizes) {
int k = medoids.size();
if (k == 0) return {{}, {}, {}};
std::vector<MedSortHelper> sorters;
sorters.reserve(k);
for(int i = 0; i < k; ++i) { sorters.push_back({medoids[i], sizes[i], i}); }
std::sort(sorters.begin(), sorters.end());
ClusteringResult res;
res.medoids.resize(k);
res.cluster_sizes.resize(k);
std::vector<int> old_to_new_map(k);
for(int i = 0; i < k; ++i) {
res.medoids[i] = sorters[i].medoid;
res.cluster_sizes[i] = sorters[i].size;
old_to_new_map[sorters[i].original_index] = i;
}
res.cluster_assignment.resize(assignment.size());
for(size_t i = 0; i < assignment.size(); ++i) {
if (assignment[i] != -1 && static_cast<size_t>(assignment[i]) < old_to_new_map.size()) {
res.cluster_assignment[i] = old_to_new_map[assignment[i]];
} else { res.cluster_assignment[i] = -1; }
}
return res;
}
// Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for encoding
inline long long zigzagEncode_scalar(long long n) {
return (n << 1) ^ (n >> 63);
}
} // end anonymous namespace
// Corresponds to: cluster_encoder_logic.cpp :: adaptiveGreedyBasePointSelection
inline ClusteringResult adaptiveGreedyBasePointSelection(const SoAData<long long>& page_data, int dim) {
auto data_aos = convert_SoA_to_AoS(page_data);
size_t n = data_aos.size();
if (n == 0) return {{}, {}, {}};
std::vector<std::vector<long long>> medoids_list;
std::unordered_set<std::vector<long long>, VectorHasher> existing_medoids_set;
std::vector<std::unordered_set<int>> points_in_cluster_list;
std::vector<int> point_to_leader_map(n, -1);
medoids_list.push_back(data_aos[0]);
existing_medoids_set.insert(data_aos[0]);
points_in_cluster_list.emplace_back();
points_in_cluster_list[0].insert(0);
point_to_leader_map[0] = 0;
for (size_t i = 1; i < n; ++i) {
const auto& current_point = data_aos[i];
if (existing_medoids_set.count(current_point)) {
int existing_leader_index = -1;
for (size_t j = 0; j < medoids_list.size(); ++j) { if (medoids_list[j] == current_point) { existing_leader_index = j; break; } }
if (existing_leader_index != -1) { points_in_cluster_list[existing_leader_index].insert(i); point_to_leader_map[i] = existing_leader_index; }
continue;
}
int best_leader_index = -1;
long long min_cost_to_existing_leader = std::numeric_limits<long long>::max();
for (size_t j = 0; j < medoids_list.size(); ++j) {
long long cost = calculateTotalLogCost(current_point, medoids_list[j], dim);
if (cost < min_cost_to_existing_leader) { min_cost_to_existing_leader = cost; best_leader_index = j; }
}
long long savings_from_current_point = min_cost_to_existing_leader;
long long savings_from_reassignment = 0;
const auto& points_in_best_leader_cluster = points_in_cluster_list[best_leader_index];
for (int point_index_in_cluster : points_in_best_leader_cluster) {
const auto& p = data_aos[point_index_in_cluster];
long long cost_to_old_leader = calculateTotalLogCost(p, medoids_list[best_leader_index], dim);
long long cost_to_new_potential_leader = calculateTotalLogCost(p, current_point, dim);
if (cost_to_new_potential_leader < cost_to_old_leader) { savings_from_reassignment += (cost_to_old_leader - cost_to_new_potential_leader); }
}
long long total_savings = savings_from_current_point + savings_from_reassignment;
long long storage_cost_for_new_point = calculateBasePointStorageCost(current_point, dim);
if (total_savings > storage_cost_for_new_point) {
int new_leader_id = medoids_list.size();
medoids_list.push_back(current_point);
existing_medoids_set.insert(current_point);
points_in_cluster_list.emplace_back();
points_in_cluster_list.back().insert(i);
point_to_leader_map[i] = new_leader_id;
std::vector<int> points_to_reassign;
for (int point_idx : points_in_cluster_list[best_leader_index]) {
const auto& p = data_aos[point_idx];
long long cost_to_old = calculateTotalLogCost(p, medoids_list[best_leader_index], dim);
long long cost_to_new = calculateTotalLogCost(p, current_point, dim);
if (cost_to_new < cost_to_old) { points_to_reassign.push_back(point_idx); }
}
for (int point_idx : points_to_reassign) {
points_in_cluster_list[best_leader_index].erase(point_idx);
points_in_cluster_list[new_leader_id].insert(point_idx);
point_to_leader_map[point_idx] = new_leader_id;
}
} else {
points_in_cluster_list[best_leader_index].insert(i);
point_to_leader_map[i] = best_leader_index;
}
}
int k = medoids_list.size();
auto discovered_medoids = medoids_list;
std::vector<long long> raw_cluster_sizes(k);
for (int i = 0; i < k; ++i) { raw_cluster_sizes[i] = points_in_cluster_list[i].size(); }
return sortResults(discovered_medoids, point_to_leader_map, raw_cluster_sizes);
}
// Corresponds to: cluster_encoder_logic.cpp :: residualCalculationZigzag_sorted
inline std::vector<std::vector<long long>> residualCalculationZigzag_sorted(
const SoAData<long long>& data, const std::vector<std::vector<long long>>& medoids,
const std::vector<int>& assignments, const std::vector<long long>& sorted_cluster_sizes, int dim)
{
int n = data.num_rows;
size_t k = medoids.size();
if (n == 0) return {};
std::vector<int> writePointers(k);
int cumulativeCount = 0;
for (size_t i = 0; i < k; ++i) {
writePointers[i] = cumulativeCount;
cumulativeCount += static_cast<int>(sorted_cluster_sizes[i]);
}
std::vector<std::vector<long long>> sortedResidualSeries(n, std::vector<long long>(dim));
for (int i = 0; i < n; i++) {
int clusterId = assignments[i];
if (clusterId < 0 || static_cast<size_t>(clusterId) >= k) continue;
const auto& base = medoids[clusterId];
int targetIndex = writePointers[clusterId];
for (int j = 0; j < dim; j++) {
long long residual = data.columns[j][i] - base[j];
sortedResidualSeries[targetIndex][j] = zigzagEncode_scalar(residual);
}
writePointers[clusterId]++;
}
return sortedResidualSeries;
}
// Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for encoding
namespace {
struct BasePointsResult { std::vector<long long> min_base; std::vector<int> max_base_bit_len; BitBuffer base_bitstream; };
inline BasePointsResult generateBitstreamEncodedBasePointsOptimized(const std::vector<std::vector<long long>>& medoids, int dim) {
if (medoids.empty()) return {std::vector<long long>(dim, 0), std::vector<int>(dim, 0), BitBuffer()};
size_t k = medoids.size();
std::vector<long long> min_base(dim, std::numeric_limits<long long>::max());
for (const auto& point : medoids) for (int i = 0; i < dim; ++i) min_base[i] = std::min(min_base[i], point[i]);
std::vector<std::vector<long long>> offset_medoids(k, std::vector<long long>(dim));
std::vector<int> max_base_bit_len(dim, 0);
for (size_t i = 0; i < k; ++i) {
for (int j = 0; j < dim; ++j) {
long long offset_value = medoids[i][j] - min_base[j];
offset_medoids[i][j] = offset_value;
max_base_bit_len[j] = std::max(max_base_bit_len[j], BitBufferUtils::bitsRequiredNoSign(offset_value));
}
}
BitBuffer base_bitstream;
for (const auto& offset_point : offset_medoids) for (int i = 0; i < dim; ++i) BitBufferUtils::appendToBitstream(base_bitstream, offset_point[i], max_base_bit_len[i]);
return {min_base, max_base_bit_len, base_bitstream};
}
inline BitBuffer generateBitstreamFrequency(const std::vector<long long>& cluster_size, int block_size) {
if (cluster_size.empty()) return BitBuffer();
BitBuffer freq_bit, freq_meta;
std::vector<long long> cluster_delta(cluster_size.size());
if (!cluster_size.empty()) {
cluster_delta[0] = cluster_size[0];
for (size_t i = 1; i < cluster_size.size(); ++i) cluster_delta[i] = cluster_size[i] - cluster_size[i - 1];
}
int total_length = cluster_size.size();
int num_blocks = (total_length + block_size - 1) / block_size;
for (int block_idx = 0; block_idx < num_blocks; ++block_idx) {
int start = block_idx * block_size;
int end = std::min(start + block_size, total_length);
long long max_frequency = 0;
for (int i = start; i < end; ++i) max_frequency = std::max(max_frequency, cluster_delta[i]);
int max_freq_bit = BitBufferUtils::bitsRequiredNoSign(max_frequency);
BitBufferUtils::appendToBitstream(freq_meta, max_freq_bit, 8);
for (int i = start; i < end; ++i) BitBufferUtils::appendToBitstream(freq_bit, cluster_delta[i], max_freq_bit);
}
freq_meta.merge(freq_bit);
return freq_meta;
}
struct EncodedDataResult { BitBuffer residual_bitstream; std::vector<std::vector<int>> pack_res_metadata; int pack_num; int total_data_points; };
inline EncodedDataResult generateBitstreamEncodedData(const std::vector<std::vector<long long>>& residual_series, int pack_size, int dim) {
int total_data_points = residual_series.size();
if (total_data_points == 0) return {BitBuffer(), {}, 0, 0};
int pack_num = (total_data_points + pack_size - 1) / pack_size;
BitBuffer residual_bit;
std::vector<std::vector<int>> pack_res_metadata(pack_num, std::vector<int>(dim));
int pack_index = 0;
for (int pack_start = 0; pack_start < total_data_points; pack_start += pack_size) {
int pack_end = std::min(pack_start + pack_size, total_data_points);
std::vector<int> max_residual_bits(dim, 0);
for (int i = pack_start; i < pack_end; ++i) for (int j = 0; j < dim; ++j) max_residual_bits[j] = std::max(max_residual_bits[j], BitBufferUtils::bitsRequiredNoSign(residual_series[i][j]));
pack_res_metadata[pack_index] = max_residual_bits;
for (int i = pack_start; i < pack_end; ++i) for (int j = 0; j < dim; ++j) BitBufferUtils::appendToBitstream(residual_bit, residual_series[i][j], max_residual_bits[j]);
pack_index++;
}
return {residual_bit, pack_res_metadata, pack_num, total_data_points};
}
} // end anonymous namespace
inline std::unique_ptr<PreprocessorResult> preprocessPageFromDoubles(const std::vector<double>& page_doubles, int& dim) {
dim = 1;
if (page_doubles.empty()) return nullptr;
int max_decimal_places = 0;
for(double val : page_doubles) {
std::string s = std::to_string(val);
auto dot_pos = s.find('.');
if (dot_pos != std::string::npos) {
s.erase(s.find_last_not_of('0') + 1, std::string::npos);
if (!s.empty() && s.back() == '.') s.pop_back();
max_decimal_places = std::max(max_decimal_places, (int)(s.length() - dot_pos - 1));
}
}
double scale = std::pow(10, max_decimal_places);
auto integerSoA = std::make_unique<SoAData<long long>>();
integerSoA->dim = 1;
integerSoA->num_rows = page_doubles.size();
integerSoA->columns.resize(1);
integerSoA->columns[0].reserve(page_doubles.size());
for(double val : page_doubles) {
integerSoA->columns[0].push_back(static_cast<long long>(val * scale));
}
long long min_val_long = 0;
if(!integerSoA->columns[0].empty()){
min_val_long = *std::min_element(integerSoA->columns[0].begin(), integerSoA->columns[0].end());
}
for (long long& val : integerSoA->columns[0]) {
val -= min_val_long;
}
return std::make_unique<PreprocessorResult>(PreprocessorResult{std::move(integerSoA), {max_decimal_places}, {min_val_long}});
}
} // namespace AClusterLogic
namespace storage {
// Base class for holding common data for ACluster encoders.
template<typename BufferType>
class AClusterEncoderData {
protected:
AClusterEncoderData() {
page_size_ = common::g_config_value_.page_writer_max_point_num_;
points_buffer_.reserve(page_size_);
}
void reset_data() {
points_buffer_.clear();
}
std::vector<BufferType> points_buffer_;
uint32_t page_size_;
};
// ====================================================================
// DoubleAClusterEncoder for FLOAT and DOUBLE
// ====================================================================
class DoubleAClusterEncoder : public Encoder, private AClusterEncoderData<double> {
public:
DoubleAClusterEncoder() = default;
~DoubleAClusterEncoder() override {}
void reset() override { reset_data(); }
int get_max_byte_size() override {
return page_size_ * sizeof(double) * 2;
}
// --- Overrides for supported types ---
int encode(double value, common::ByteStream &out_stream) override {
points_buffer_.push_back(value);
return common::E_OK;
}
int encode(float value, common::ByteStream &out_stream) override {
points_buffer_.push_back(static_cast<double>(value));
return common::E_OK;
}
// --- Override for flush ---
int flush(common::ByteStream &out_stream) override;
// --- Overrides for unsupported types ---
int encode(bool, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
int encode(int32_t, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
int encode(int64_t, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
int encode(common::String, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
};
// ====================================================================
// IntAClusterEncoder for INT32 and INT64
// ====================================================================
class IntAClusterEncoder : public Encoder, private AClusterEncoderData<long long> {
public:
IntAClusterEncoder() = default;
~IntAClusterEncoder() override {}
void reset() override { reset_data(); }
int get_max_byte_size() override {
return page_size_ * sizeof(long long) * 2;
}
// --- Overrides for supported types ---
int encode(long long value, common::ByteStream &out_stream) override {
points_buffer_.push_back(value);
return common::E_OK;
}
int encode(int32_t value, common::ByteStream &out_stream) override {
points_buffer_.push_back(static_cast<long long>(value));
return common::E_OK;
}
// --- Override for flush ---
int flush(common::ByteStream &out_stream) override;
// --- Overrides for unsupported types ---
int encode(bool, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
int encode(float, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
int encode(double, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
int encode(common::String, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
};
// ====================================================================
// Implementation of flush methods
// ====================================================================
// --- FLUSH IMPLEMENTATION FOR DoubleAClusterEncoder ---
inline int DoubleAClusterEncoder::flush(common::ByteStream &out_stream) {
if (points_buffer_.empty()) {
return common::E_OK;
}
try {
int dim = 1;
int pack_size = 10;
int block_size = 10;
auto preproc_res_ptr = AClusterLogic::preprocessPageFromDoubles(points_buffer_, dim);
if (!preproc_res_ptr || !preproc_res_ptr->data) {
reset();
return common::E_OK;
}
// <<<<<<<<<<<< CORE LOGIC CHANGE >>>>>>>>>>>>
// Call ACluster's core clustering algorithm
AClusterLogic::ClusteringResult clustering_res = AClusterLogic::adaptiveGreedyBasePointSelection(*preproc_res_ptr->data, dim);
using namespace AClusterLogic;
int page_k = clustering_res.medoids.size();
BitBuffer fre_bitstream = generateBitstreamFrequency(clustering_res.cluster_sizes, block_size);
std::vector<std::vector<long long>> residuals = residualCalculationZigzag_sorted(*preproc_res_ptr->data, clustering_res.medoids, clustering_res.cluster_assignment, clustering_res.cluster_sizes, dim);
EncodedDataResult bitstream_res = generateBitstreamEncodedData(residuals, pack_size, dim);
BasePointsResult basepoint_res = generateBitstreamEncodedBasePointsOptimized(clustering_res.medoids, dim);
BitBuffer single_page_bitstream;
BitBufferUtils::appendToBitstream(single_page_bitstream, (long long)page_k, 16);
BitBufferUtils::appendToBitstream(single_page_bitstream, bitstream_res.total_data_points, 16);
for (int j = 0; j < dim; ++j) {
BitBufferUtils::appendToBitstream(single_page_bitstream, preproc_res_ptr->max_decimal_places[j], 8);
long long val = preproc_res_ptr->min_values[j]; long long abs_val = std::abs(val); int bit_len = BitBufferUtils::bitsRequiredNoSign(abs_val);
BitBufferUtils::appendToBitstream(single_page_bitstream, bit_len, 8); BitBufferUtils::appendToBitstream(single_page_bitstream, (val >= 0 ? 0 : 1), 1); BitBufferUtils::appendToBitstream(single_page_bitstream, abs_val, bit_len);
}
for (int j = 0; j < dim; ++j) {
long long val = basepoint_res.min_base[j]; long long abs_val = std::abs(val); int bit_len = BitBufferUtils::bitsRequiredNoSign(abs_val);
BitBufferUtils::appendToBitstream(single_page_bitstream, bit_len, 8); BitBufferUtils::appendToBitstream(single_page_bitstream, (val >= 0 ? 0 : 1), 1); BitBufferUtils::appendToBitstream(single_page_bitstream, abs_val, bit_len);
BitBufferUtils::appendToBitstream(single_page_bitstream, basepoint_res.max_base_bit_len[j], 8);
}
BitBufferUtils::appendToBitstream(single_page_bitstream, (long long)bitstream_res.pack_res_metadata.size(), 16);
for (const auto& pack : bitstream_res.pack_res_metadata) for (int val : pack) BitBufferUtils::appendToBitstream(single_page_bitstream, val, 8);
single_page_bitstream.merge(basepoint_res.base_bitstream);
single_page_bitstream.merge(fre_bitstream);
single_page_bitstream.merge(bitstream_res.residual_bitstream);
std::vector<uint8_t> compressed_bytes = single_page_bitstream.toByteArray();
uint32_t data_size = compressed_bytes.size();
out_stream.write_buf(reinterpret_cast<const char*>(&data_size), sizeof(data_size));
if (data_size > 0) {
out_stream.write_buf(reinterpret_cast<const char*>(compressed_bytes.data()), data_size);
}
} catch (const std::exception& e) {
reset();
return common::E_ENCODING_ERROR;
}
reset();
return common::E_OK;
}
// --- FLUSH IMPLEMENTATION FOR IntAClusterEncoder ---
inline int IntAClusterEncoder::flush(common::ByteStream &out_stream) {
if (points_buffer_.empty()) {
return common::E_OK;
}
try {
int dim = 1;
int pack_size = 10;
int block_size = 10;
auto preproc_res_ptr = AClusterLogic::preprocessPageFromLongs(points_buffer_, dim);
if (!preproc_res_ptr || !preproc_res_ptr->data) {
reset();
return common::E_OK;
}
// <<<<<<<<<<<< CORE LOGIC CHANGE >>>>>>>>>>>>
// Call ACluster's core clustering algorithm
AClusterLogic::ClusteringResult clustering_res = AClusterLogic::adaptiveGreedyBasePointSelection(*preproc_res_ptr->data, dim);
using namespace AClusterLogic;
int page_k = clustering_res.medoids.size();
BitBuffer fre_bitstream = generateBitstreamFrequency(clustering_res.cluster_sizes, block_size);
std::vector<std::vector<long long>> residuals = residualCalculationZigzag_sorted(*preproc_res_ptr->data, clustering_res.medoids, clustering_res.cluster_assignment, clustering_res.cluster_sizes, dim);
EncodedDataResult bitstream_res = generateBitstreamEncodedData(residuals, pack_size, dim);
BasePointsResult basepoint_res = generateBitstreamEncodedBasePointsOptimized(clustering_res.medoids, dim);
BitBuffer single_page_bitstream;
BitBufferUtils::appendToBitstream(single_page_bitstream, (long long)page_k, 16);
BitBufferUtils::appendToBitstream(single_page_bitstream, bitstream_res.total_data_points, 16);
for (int j = 0; j < dim; ++j) {
BitBufferUtils::appendToBitstream(single_page_bitstream, preproc_res_ptr->max_decimal_places[j], 8);
long long val = preproc_res_ptr->min_values[j]; long long abs_val = std::abs(val); int bit_len = BitBufferUtils::bitsRequiredNoSign(abs_val);
BitBufferUtils::appendToBitstream(single_page_bitstream, bit_len, 8); BitBufferUtils::appendToBitstream(single_page_bitstream, (val >= 0 ? 0 : 1), 1); BitBufferUtils::appendToBitstream(single_page_bitstream, abs_val, bit_len);
}
for (int j = 0; j < dim; ++j) {
long long val = basepoint_res.min_base[j]; long long abs_val = std::abs(val); int bit_len = BitBufferUtils::bitsRequiredNoSign(abs_val);
BitBufferUtils::appendToBitstream(single_page_bitstream, bit_len, 8); BitBufferUtils::appendToBitstream(single_page_bitstream, (val >= 0 ? 0 : 1), 1); BitBufferUtils::appendToBitstream(single_page_bitstream, abs_val, bit_len);
BitBufferUtils::appendToBitstream(single_page_bitstream, basepoint_res.max_base_bit_len[j], 8);
}
BitBufferUtils::appendToBitstream(single_page_bitstream, (long long)bitstream_res.pack_res_metadata.size(), 16);
for (const auto& pack : bitstream_res.pack_res_metadata) for (int val : pack) BitBufferUtils::appendToBitstream(single_page_bitstream, val, 8);
single_page_bitstream.merge(basepoint_res.base_bitstream);
single_page_bitstream.merge(fre_bitstream);
single_page_bitstream.merge(bitstream_res.residual_bitstream);
std::vector<uint8_t> compressed_bytes = single_page_bitstream.toByteArray();
uint32_t data_size = compressed_bytes.size();
out_stream.write_buf(reinterpret_cast<const char*>(&data_size), sizeof(data_size));
if (data_size > 0) {
out_stream.write_buf(reinterpret_cast<const char*>(compressed_bytes.data()), data_size);
}
} catch (const std::exception& e) {
reset();
return common::E_ENCODING_ERROR;
}
reset();
return common::E_OK;
}
} // namespace storage
#endif // ENCODING_ACLUSTER_ENCODER_H