Research/cluster compress (#614)
diff --git a/cpp/src/common/db_common.h b/cpp/src/common/db_common.h
index 8f87e5d..9c8ec0d 100644
--- a/cpp/src/common/db_common.h
+++ b/cpp/src/common/db_common.h
@@ -52,6 +52,8 @@
GORILLA = 8,
ZIGZAG = 9,
FREQ = 10,
+ KCLUSTER = 11,
+ ACLUSTER = 12,
INVALID_ENCODING = 255
};
@@ -68,7 +70,7 @@
};
extern const char* s_data_type_names[8];
-extern const char* s_encoding_names[12];
+extern const char* s_encoding_names[14];
extern const char* s_compression_names[8];
FORCE_INLINE const char* get_data_type_name(TSDataType type) {
@@ -77,7 +79,7 @@
}
FORCE_INLINE const char* get_encoding_name(TSEncoding encoding) {
- ASSERT(encoding >= PLAIN && encoding <= FREQ);
+ ASSERT(encoding >= PLAIN && encoding <= ACLUSTER);
return s_encoding_names[encoding];
}
diff --git a/cpp/src/common/global.cc b/cpp/src/common/global.cc
index 7913a2b..7189abe 100644
--- a/cpp/src/common/global.cc
+++ b/cpp/src/common/global.cc
@@ -90,9 +90,9 @@
const char* s_data_type_names[8] = {"BOOLEAN", "INT32", "INT64", "FLOAT",
"DOUBLE", "TEXT", "VECTOR", "STRING"};
-const char* s_encoding_names[12] = {
+const char* s_encoding_names[14] = {
"PLAIN", "DICTIONARY", "RLE", "DIFF", "TS_2DIFF", "BITMAP",
- "GORILLA_V1", "REGULAR", "GORILLA", "ZIGZAG", "FREQ"};
+ "GORILLA_V1", "REGULAR", "GORILLA", "ZIGZAG", "FREQ", "KCLUSTER", "ACLUSTER"};
const char* s_compression_names[8] = {
"UNCOMPRESSED", "SNAPPY", "GZIP", "LZO", "SDT", "PAA", "PLA", "LZ4",
diff --git a/cpp/src/encoding/acluster_decoder.h b/cpp/src/encoding/acluster_decoder.h
new file mode 100644
index 0000000..7d8c81e
--- /dev/null
+++ b/cpp/src/encoding/acluster_decoder.h
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ #ifndef ENCODING_ACLUSTER_DECODER_H
+#define ENCODING_ACLUSTER_DECODER_H
+
+#include <vector>
+#include <string>
+#include <memory>
+#include <cmath>
+#include <stdexcept>
+#include <limits>
+#include <cstdint>
+
+#include "decoder.h"
+#include "common/allocator/byte_stream.h"
+
+// ===================================================================
+// Part 1: Core ACluster Decoding Logic
+// ===================================================================
+namespace AClusterDecodeLogic {
+// The entire logic here is identical to KClusterDecodeLogic, as they share the same decode_multidim_impl
+// Corresponds to: cluster_encoder_logic.cpp :: BitStreamReader
+class BitStreamReader {
+public:
+ explicit BitStreamReader(const std::vector<uint8_t>& data)
+ : buffer(data), bufferSize(data.size() * 8), currentBitPos(0) {}
+
+ inline long long readBits(int numBits) {
+ if (numBits < 0 || numBits > 64) throw std::invalid_argument("Cannot read " + std::to_string(numBits) + " bits.");
+ if (currentBitPos + numBits > bufferSize) throw std::out_of_range("Not enough bits in stream to read " + std::to_string(numBits) + " bits.");
+
+ long long value = 0;
+ for (int i = 0; i < numBits; ++i) {
+ size_t byteIndex = currentBitPos / 8;
+ int bitIndex = 7 - (currentBitPos % 8);
+ bool bit = (buffer[byteIndex] & (1 << bitIndex)) != 0;
+ value = (value << 1) | (bit ? 1LL : 0LL);
+ currentBitPos++;
+ }
+ return value;
+ }
+
+ inline bool has_more() const {
+ return currentBitPos < bufferSize;
+ }
+
+private:
+ const std::vector<uint8_t>& buffer;
+ const size_t bufferSize;
+ size_t currentBitPos;
+};
+
+// Corresponds to: cluster_encoder_logic.cpp :: zigzagDecode
+inline long long zigzagDecode(long long n) { return (n >> 1) ^ (-(n & 1)); }
+
+// Corresponds to: cluster_encoder_logic.cpp :: decode_multidim_impl
+inline std::vector<std::vector<double>> decode_multidim_impl(const std::vector<uint8_t>& compressed_data) {
+ if (compressed_data.empty()) return {};
+ BitStreamReader reader(compressed_data);
+
+ int dim = reader.readBits(8);
+ int pack_size = reader.readBits(16);
+ int block_size = reader.readBits(16);
+ int page_count = reader.readBits(32);
+ int page_size = reader.readBits(16);
+ (void)pack_size; (void)page_size; // Suppress unused variable warnings
+
+ std::vector<double> pow10_lookup; for (int i = 0; i < 20; ++i) pow10_lookup.push_back(std::pow(10, i));
+ std::vector<std::vector<double>> all_data_rows;
+
+ for (int i = 0; i < page_count; ++i) {
+ int k = reader.readBits(16);
+ int page_data_points = reader.readBits(16);
+
+ std::vector<int> max_decimals(dim);
+ std::vector<long long> min_values(dim);
+ for (int j = 0; j < dim; ++j) {
+ max_decimals[j] = reader.readBits(8);
+ int bit_len = reader.readBits(8);
+ long long sign = reader.readBits(1);
+ long long abs_val = reader.readBits(bit_len);
+ min_values[j] = (sign == 0) ? abs_val : -abs_val;
+ }
+
+ std::vector<long long> min_bases(dim);
+ std::vector<int> max_base_bit_len(dim);
+ for (int j = 0; j < dim; ++j) {
+ int bit_len = reader.readBits(8);
+ long long sign = reader.readBits(1);
+ long long abs_val = reader.readBits(bit_len);
+ min_bases[j] = (sign == 0) ? abs_val : -abs_val;
+ max_base_bit_len[j] = reader.readBits(8);
+ }
+
+ int pack_num = reader.readBits(16);
+ std::vector<std::vector<int>> pack_metadata(pack_num, std::vector<int>(dim));
+ for (int p = 0; p < pack_num; ++p) for (int j = 0; j < dim; ++j) pack_metadata[p][j] = reader.readBits(8);
+
+ std::vector<std::vector<long long>> medoids_long(k, std::vector<long long>(dim));
+ for (int m = 0; m < k; ++m) for (int j = 0; j < dim; ++j) medoids_long[m][j] = reader.readBits(max_base_bit_len[j]) + min_bases[j];
+
+ std::vector<long long> cluster_sizes(k, 0);
+ if (k > 0) {
+ int num_freq_blocks = (k + block_size - 1) / block_size;
+ std::vector<int> max_bits_per_block(num_freq_blocks);
+ for (int b = 0; b < num_freq_blocks; ++b) max_bits_per_block[b] = reader.readBits(8);
+ std::vector<long long> deltas(k);
+ int freq_idx = 0;
+ for (int b = 0; b < num_freq_blocks && freq_idx < k; ++b) {
+ int max_bit_for_this_block = max_bits_per_block[b];
+ int end = std::min((b + 1) * block_size, k);
+ for (int j = freq_idx; j < end; ++j) deltas[j] = reader.readBits(max_bit_for_this_block);
+ freq_idx = end;
+ }
+ cluster_sizes[0] = deltas[0];
+ for (size_t j = 1; j < deltas.size(); ++j) cluster_sizes[j] = cluster_sizes[j - 1] + deltas[j];
+ }
+
+ std::vector<std::vector<long long>> residual_series(page_data_points, std::vector<long long>(dim));
+ int data_counter = 0;
+ if (page_data_points > 0) {
+ for (int p = 0; p < pack_num; ++p) {
+ const auto& bits = pack_metadata[p];
+ int points_in_pack = (p == pack_num - 1) ? (page_data_points - (p * pack_size)) : pack_size;
+ for (int pt = 0; pt < points_in_pack; ++pt) {
+ if (data_counter >= page_data_points) break;
+ for (int j = 0; j < dim; ++j) residual_series[data_counter][j] = zigzagDecode(reader.readBits(bits[j]));
+ data_counter++;
+ }
+ }
+ }
+
+ if (page_data_points > 0 && k > 0) {
+ int current_point_idx = 0;
+ for (int medoid_idx = 0; medoid_idx < k; ++medoid_idx) {
+ long long points_in_this_cluster = cluster_sizes[medoid_idx];
+ const auto& base_point = medoids_long[medoid_idx];
+ for (int p_count = 0; p_count < points_in_this_cluster; ++p_count) {
+ if (current_point_idx < page_data_points) {
+ std::vector<double> row(dim);
+ for (int j = 0; j < dim; ++j) {
+ long long int_val = base_point[j] + residual_series[current_point_idx][j] + min_values[j];
+ row[j] = (max_decimals[j] > 0) ? (static_cast<double>(int_val) / pow10_lookup[max_decimals[j]]) : static_cast<double>(int_val);
+ }
+ all_data_rows.push_back(row);
+ current_point_idx++;
+ }
+ }
+ }
+ } else if (page_data_points > 0) {
+ for (int d = 0; d < page_data_points; ++d) {
+ std::vector<double> row(dim);
+ for (int j = 0; j < dim; ++j) {
+ long long int_val = residual_series[d][j] + min_values[j];
+ row[j] = (max_decimals[j] > 0) ? (static_cast<double>(int_val) / pow10_lookup[max_decimals[j]]) : static_cast<double>(int_val);
+ }
+ all_data_rows.push_back(row);
+ }
+ }
+ }
+ return all_data_rows;
+}
+
+} // namespace AClusterDecodeLogic
+
+
+// ===================================================================
+// Part 2: The Decoder classes that adapt the logic for TsFile
+// ===================================================================
+namespace storage {
+
+template<typename T>
+class AClusterDecoderBase : public Decoder {
+public:
+ AClusterDecoderBase() : current_read_index_(0), decoded_(false) {}
+ ~AClusterDecoderBase() override {}
+
+ int read_boolean(bool&, common::ByteStream&) override { return common::E_NOT_SUPPORT; }
+ int read_int32(int32_t&, common::ByteStream&) override { return common::E_NOT_SUPPORT; }
+ int read_int64(long long&, common::ByteStream&) override { return common::E_NOT_SUPPORT; }
+ int read_float(float&, common::ByteStream&) override { return common::E_NOT_SUPPORT; }
+ int read_double(double&, common::ByteStream&) override { return common::E_NOT_SUPPORT; }
+ int read_String(common::String&, common::PageArena&, common::ByteStream&) override { return common::E_NOT_SUPPORT; }
+
+protected:
+ void decode_page_if_needed(common::ByteStream &in);
+ std::vector<T> decoded_points_;
+ size_t current_read_index_;
+ bool decoded_;
+};
+
+class DoubleAClusterDecoder : public AClusterDecoderBase<double> {
+public:
+ int read_double(double& val, common::ByteStream& in) override;
+ int read_float(float& val, common::ByteStream& in) override;
+};
+
+class IntAClusterDecoder : public AClusterDecoderBase<long long> {
+public:
+ int read_int64(long long& val, common::ByteStream& in) override;
+ int read_int32(int32_t& val, common::ByteStream& in) override;
+};
+
+
+// --- Implementation ---
+template<typename T>
+inline void AClusterDecoderBase<T>::decode_page_if_needed(common::ByteStream &in) {
+ if (decoded_ || !in.has_remaining()) return;
+
+ uint32_t data_size = 0;
+ uint32_t read_len = 0;
+ in.read_buf(reinterpret_cast<char*>(&data_size), sizeof(data_size), read_len);
+ if (read_len != sizeof(data_size)) throw std::runtime_error("Failed to read ACluster data block size");
+ if (data_size == 0) {
+ decoded_ = true;
+ return;
+ }
+
+ std::vector<uint8_t> compressed_block(data_size);
+ in.read_buf(reinterpret_cast<char*>(compressed_block.data()), data_size, read_len);
+ if (read_len != data_size) throw std::runtime_error("Failed to read ACluster data block");
+
+ AClusterDecodeLogic::BitBuffer fake_bytestream;
+ int dim = 1, pack_size = 10, block_size = 10, page_count = 1, page_size = 0;
+ AClusterDecodeLogic::BitBufferUtils::appendToBitstream(fake_bytestream, dim, 8);
+ AClusterDecodeLogic::BitBufferUtils::appendToBitstream(fake_bytestream, pack_size, 16);
+ AClusterDecodeLogic::BitBufferUtils::appendToBitstream(fake_bytestream, block_size, 16);
+ AClusterDecodeLogic::BitBufferUtils::appendToBitstream(fake_bytestream, page_count, 32);
+ AClusterDecodeLogic::BitBufferUtils::appendToBitstream(fake_bytestream, page_size, 16);
+
+ std::vector<uint8_t> final_bytes_to_decode = fake_bytestream.toByteArray();
+ final_bytes_to_decode.insert(final_bytes_to_decode.end(), compressed_block.begin(), compressed_block.end());
+
+ std::vector<std::vector<double>> result_multidim = AClusterDecodeLogic::decode_multidim_impl(final_bytes_to_decode);
+
+ if (!result_multidim.empty()) {
+ decoded_points_.reserve(result_multidim.size());
+ for(const auto& row : result_multidim) {
+ if (!row.empty()) {
+ decoded_points_.push_back(static_cast<T>(row[0]));
+ }
+ }
+ }
+ decoded_ = true;
+}
+
+inline int DoubleAClusterDecoder::read_double(double& val, common::ByteStream& in) {
+ try { decode_page_if_needed(in); } catch(const std::exception& e) { return common::E_DECODING_ERROR; }
+ if (current_read_index_ >= decoded_points_.size()) return common::E_NO_MORE_DATA;
+ val = decoded_points_[current_read_index_++];
+ return common::E_OK;
+}
+inline int DoubleAClusterDecoder::read_float(float& val, common::ByteStream& in) {
+ double d_val;
+ int ret = read_double(d_val, in);
+ if (ret == common::E_OK) val = static_cast<float>(d_val);
+ return ret;
+}
+
+inline int IntAClusterDecoder::read_int64(long long& val, common::ByteStream& in) {
+ try { decode_page_if_needed(in); } catch(const std::exception& e) { return common::E_DECODING_ERROR; }
+ if (current_read_index_ >= decoded_points_.size()) return common::E_NO_MORE_DATA;
+ val = decoded_points_[current_read_index_++];
+ return common::E_OK;
+}
+inline int IntAClusterDecoder::read_int32(int32_t& val, common::ByteStream& in) {
+ long long ll_val;
+ int ret = read_int64(ll_val, in);
+ if (ret == common::E_OK) {
+ if (ll_val < std::numeric_limits<int32_t>::min() || ll_val > std::numeric_limits<int32_t>::max()) {
+ return common::E_DECODING_ERROR;
+ }
+ val = static_cast<int32_t>(ll_val);
+ }
+ return ret;
+}
+
+} // namespace storage
+#endif // ENCODING_ACLUSTER_DECODER_H
\ No newline at end of file
diff --git a/cpp/src/encoding/acluster_encoder.h b/cpp/src/encoding/acluster_encoder.h
new file mode 100644
index 0000000..1f7625b
--- /dev/null
+++ b/cpp/src/encoding/acluster_encoder.h
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef ENCODING_ACLUSTER_ENCODER_H
+#define ENCODING_ACLUSTER_ENCODER_H
+
+#include <vector>
+#include <string>
+#include <memory>
+#include <cmath>
+#include <numeric>
+#include <algorithm>
+#include <stdexcept>
+#include <limits>
+#include <unordered_set>
+#include <random>
+#include <sstream>
+#include <functional>
+#include <cstdint> // For uint8_t, size_t, etc.
+
+#include "encoder.h"
+#include "common/allocator/byte_stream.h"
+#include "common/global.h"
+
+// ===================================================================
+// Part 1: Core ACluster Algorithm Logic
+// ===================================================================
+
+namespace AClusterLogic {
+
+// Corresponds to: algorithms/cluster_common.h
+template<typename T>
+struct SoAData {
+ std::vector<std::vector<T>> columns;
+ int dim = 0;
+ int num_rows = 0;
+};
+
+// Corresponds to: algorithms/cluster_common.h
+struct ClusteringResult {
+ std::vector<std::vector<long long>> medoids;
+ std::vector<int> cluster_assignment;
+ std::vector<long long> cluster_sizes;
+};
+
+// Corresponds to: algorithms/cluster_common.h
+struct PreprocessorResult {
+ std::unique_ptr<SoAData<long long>> data;
+ std::vector<int> max_decimal_places;
+ std::vector<long long> min_values;
+};
+
+// Corresponds to: cluster_encoder_logic.cpp
+struct BitBuffer {
+ std::vector<uint8_t> buffer;
+ size_t currentBitPos = 0;
+
+ inline void appendBit(bool bit) {
+ size_t byteIndex = currentBitPos / 8;
+ int bitIndexInByte = currentBitPos % 8;
+ if (byteIndex >= buffer.size()) buffer.push_back(0);
+ if (bit) buffer[byteIndex] |= (1 << (7 - bitIndexInByte));
+ currentBitPos++;
+ }
+
+ inline void merge(const BitBuffer& other) {
+ for (size_t i = 0; i < other.currentBitPos; ++i) {
+ size_t byteIndex = i / 8;
+ int bitOffset = 7 - (i % 8);
+ bool bit = (other.buffer[byteIndex] & (1 << bitOffset)) != 0;
+ appendBit(bit);
+ }
+ }
+
+ inline std::vector<uint8_t> toByteArray() const { return buffer; }
+ inline size_t size() const { return currentBitPos; }
+};
+
+namespace BitBufferUtils {
+ // Corresponds to: cluster_encoder_logic.cpp :: BitBufferUtils
+ inline void appendToBitstream(BitBuffer& bitBuffer, long long num, int bits) {
+ for (int i = bits - 1; i >= 0; --i) bitBuffer.appendBit((num >> i) & 1);
+ }
+ // Corresponds to: cluster_encoder_logic.cpp :: BitBufferUtils
+ inline int bitsRequiredNoSign(long long value) {
+ if (value == 0) return 1;
+ unsigned long long u_value = (value > 0) ? value : -value;
+ #if defined(__GNUC__) || defined(__clang__)
+ return 64 - __builtin_clzll(u_value);
+ #else
+ int length = 0; while(u_value > 0){ u_value >>= 1; length++; } return length == 0 ? 1 : length;
+ #endif
+ }
+}
+
+namespace { // Anonymous namespace for internal linkage helpers
+ // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for Preprocessor
+ inline int bitLength(long long value) {
+ if (value == 0) return 1;
+ unsigned long long u_value = (value > 0) ? value : -value;
+ #if defined(__GNUC__) || defined(__clang__)
+ return 64 - __builtin_clzll(u_value);
+ #else
+ int length = 0; while (u_value > 0) { u_value >>= 1; length++; } return length;
+ #endif
+ }
+ // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for clustering
+ inline long long calculateTotalLogCost(const std::vector<long long>& p1, const std::vector<long long>& p2, int dim) {
+ long long totalCost = 0;
+ for (int i = 0; i < dim; ++i) {
+ long long residual = p1[i] - p2[i];
+ totalCost += bitLength(residual >= 0 ? residual : -residual) + 1;
+ }
+ return totalCost;
+ }
+ // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for clustering
+ inline long long calculateBasePointStorageCost(const std::vector<long long>& basePoint, int dim) {
+ long long storageCost = 0;
+ for (int i = 0; i < dim; ++i) {
+ storageCost += bitLength(basePoint[i] >= 0 ? basePoint[i] : -basePoint[i]) + 1;
+ }
+ return storageCost;
+ }
+ // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for clustering
+ inline std::vector<long long> get_point_from_soa(const SoAData<long long>& soa_data, int row_idx) {
+ std::vector<long long> point(soa_data.dim);
+ for (int d = 0; d < soa_data.dim; ++d) {
+ point[d] = soa_data.columns[d][row_idx];
+ }
+ return point;
+ }
+ // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for clustering
+ inline std::vector<std::vector<long long>> convert_SoA_to_AoS(const SoAData<long long>& soa_data) {
+ if (soa_data.num_rows == 0) return {};
+ std::vector<std::vector<long long>> aos_data(soa_data.num_rows, std::vector<long long>(soa_data.dim));
+ for (int j = 0; j < soa_data.dim; ++j) {
+ for (int i = 0; i < soa_data.num_rows; ++i) {
+ aos_data[i][j] = soa_data.columns[j][i];
+ }
+ }
+ return aos_data;
+ }
+ // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for clustering
+ struct VectorHasher {
+ inline std::size_t operator()(const std::vector<long long>& vec) const {
+ std::size_t seed = vec.size();
+ for(long long i : vec) { seed ^= std::hash<long long>()(i) + 0x9e3779b9 + (seed << 6) + (seed >> 2); }
+ return seed;
+ }
+ };
+ // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for clustering
+ struct MedSortHelper {
+ std::vector<long long> medoid;
+ long long size;
+ int original_index;
+ inline bool operator<(const MedSortHelper& other) const { return size < other.size; }
+ };
+
+ // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for clustering
+ inline ClusteringResult sortResults(std::vector<std::vector<long long>>& medoids, std::vector<int>& assignment, std::vector<long long>& sizes) {
+ int k = medoids.size();
+ if (k == 0) return {{}, {}, {}};
+ std::vector<MedSortHelper> sorters;
+ sorters.reserve(k);
+ for(int i = 0; i < k; ++i) { sorters.push_back({medoids[i], sizes[i], i}); }
+ std::sort(sorters.begin(), sorters.end());
+ ClusteringResult res;
+ res.medoids.resize(k);
+ res.cluster_sizes.resize(k);
+ std::vector<int> old_to_new_map(k);
+ for(int i = 0; i < k; ++i) {
+ res.medoids[i] = sorters[i].medoid;
+ res.cluster_sizes[i] = sorters[i].size;
+ old_to_new_map[sorters[i].original_index] = i;
+ }
+ res.cluster_assignment.resize(assignment.size());
+ for(size_t i = 0; i < assignment.size(); ++i) {
+ if (assignment[i] != -1 && static_cast<size_t>(assignment[i]) < old_to_new_map.size()) {
+ res.cluster_assignment[i] = old_to_new_map[assignment[i]];
+ } else { res.cluster_assignment[i] = -1; }
+ }
+ return res;
+ }
+ // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for encoding
+ inline long long zigzagEncode_scalar(long long n) {
+ return (n << 1) ^ (n >> 63);
+ }
+} // end anonymous namespace
+
+// Corresponds to: cluster_encoder_logic.cpp :: adaptiveGreedyBasePointSelection
+inline ClusteringResult adaptiveGreedyBasePointSelection(const SoAData<long long>& page_data, int dim) {
+ auto data_aos = convert_SoA_to_AoS(page_data);
+ size_t n = data_aos.size();
+ if (n == 0) return {{}, {}, {}};
+ std::vector<std::vector<long long>> medoids_list;
+ std::unordered_set<std::vector<long long>, VectorHasher> existing_medoids_set;
+ std::vector<std::unordered_set<int>> points_in_cluster_list;
+ std::vector<int> point_to_leader_map(n, -1);
+ medoids_list.push_back(data_aos[0]);
+ existing_medoids_set.insert(data_aos[0]);
+ points_in_cluster_list.emplace_back();
+ points_in_cluster_list[0].insert(0);
+ point_to_leader_map[0] = 0;
+ for (size_t i = 1; i < n; ++i) {
+ const auto& current_point = data_aos[i];
+ if (existing_medoids_set.count(current_point)) {
+ int existing_leader_index = -1;
+ for (size_t j = 0; j < medoids_list.size(); ++j) { if (medoids_list[j] == current_point) { existing_leader_index = j; break; } }
+ if (existing_leader_index != -1) { points_in_cluster_list[existing_leader_index].insert(i); point_to_leader_map[i] = existing_leader_index; }
+ continue;
+ }
+ int best_leader_index = -1;
+ long long min_cost_to_existing_leader = std::numeric_limits<long long>::max();
+ for (size_t j = 0; j < medoids_list.size(); ++j) {
+ long long cost = calculateTotalLogCost(current_point, medoids_list[j], dim);
+ if (cost < min_cost_to_existing_leader) { min_cost_to_existing_leader = cost; best_leader_index = j; }
+ }
+ long long savings_from_current_point = min_cost_to_existing_leader;
+ long long savings_from_reassignment = 0;
+ const auto& points_in_best_leader_cluster = points_in_cluster_list[best_leader_index];
+ for (int point_index_in_cluster : points_in_best_leader_cluster) {
+ const auto& p = data_aos[point_index_in_cluster];
+ long long cost_to_old_leader = calculateTotalLogCost(p, medoids_list[best_leader_index], dim);
+ long long cost_to_new_potential_leader = calculateTotalLogCost(p, current_point, dim);
+ if (cost_to_new_potential_leader < cost_to_old_leader) { savings_from_reassignment += (cost_to_old_leader - cost_to_new_potential_leader); }
+ }
+ long long total_savings = savings_from_current_point + savings_from_reassignment;
+ long long storage_cost_for_new_point = calculateBasePointStorageCost(current_point, dim);
+ if (total_savings > storage_cost_for_new_point) {
+ int new_leader_id = medoids_list.size();
+ medoids_list.push_back(current_point);
+ existing_medoids_set.insert(current_point);
+ points_in_cluster_list.emplace_back();
+ points_in_cluster_list.back().insert(i);
+ point_to_leader_map[i] = new_leader_id;
+ std::vector<int> points_to_reassign;
+ for (int point_idx : points_in_cluster_list[best_leader_index]) {
+ const auto& p = data_aos[point_idx];
+ long long cost_to_old = calculateTotalLogCost(p, medoids_list[best_leader_index], dim);
+ long long cost_to_new = calculateTotalLogCost(p, current_point, dim);
+ if (cost_to_new < cost_to_old) { points_to_reassign.push_back(point_idx); }
+ }
+ for (int point_idx : points_to_reassign) {
+ points_in_cluster_list[best_leader_index].erase(point_idx);
+ points_in_cluster_list[new_leader_id].insert(point_idx);
+ point_to_leader_map[point_idx] = new_leader_id;
+ }
+ } else {
+ points_in_cluster_list[best_leader_index].insert(i);
+ point_to_leader_map[i] = best_leader_index;
+ }
+ }
+ int k = medoids_list.size();
+ auto discovered_medoids = medoids_list;
+ std::vector<long long> raw_cluster_sizes(k);
+ for (int i = 0; i < k; ++i) { raw_cluster_sizes[i] = points_in_cluster_list[i].size(); }
+ return sortResults(discovered_medoids, point_to_leader_map, raw_cluster_sizes);
+}
+
+// Corresponds to: cluster_encoder_logic.cpp :: residualCalculationZigzag_sorted
+inline std::vector<std::vector<long long>> residualCalculationZigzag_sorted(
+ const SoAData<long long>& data, const std::vector<std::vector<long long>>& medoids,
+ const std::vector<int>& assignments, const std::vector<long long>& sorted_cluster_sizes, int dim)
+{
+ int n = data.num_rows;
+ size_t k = medoids.size();
+ if (n == 0) return {};
+ std::vector<int> writePointers(k);
+ int cumulativeCount = 0;
+ for (size_t i = 0; i < k; ++i) {
+ writePointers[i] = cumulativeCount;
+ cumulativeCount += static_cast<int>(sorted_cluster_sizes[i]);
+ }
+ std::vector<std::vector<long long>> sortedResidualSeries(n, std::vector<long long>(dim));
+ for (int i = 0; i < n; i++) {
+ int clusterId = assignments[i];
+ if (clusterId < 0 || static_cast<size_t>(clusterId) >= k) continue;
+ const auto& base = medoids[clusterId];
+ int targetIndex = writePointers[clusterId];
+ for (int j = 0; j < dim; j++) {
+ long long residual = data.columns[j][i] - base[j];
+ sortedResidualSeries[targetIndex][j] = zigzagEncode_scalar(residual);
+ }
+ writePointers[clusterId]++;
+ }
+ return sortedResidualSeries;
+}
+
+// Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for encoding
+namespace {
+ struct BasePointsResult { std::vector<long long> min_base; std::vector<int> max_base_bit_len; BitBuffer base_bitstream; };
+ inline BasePointsResult generateBitstreamEncodedBasePointsOptimized(const std::vector<std::vector<long long>>& medoids, int dim) {
+ if (medoids.empty()) return {std::vector<long long>(dim, 0), std::vector<int>(dim, 0), BitBuffer()};
+ size_t k = medoids.size();
+ std::vector<long long> min_base(dim, std::numeric_limits<long long>::max());
+ for (const auto& point : medoids) for (int i = 0; i < dim; ++i) min_base[i] = std::min(min_base[i], point[i]);
+ std::vector<std::vector<long long>> offset_medoids(k, std::vector<long long>(dim));
+ std::vector<int> max_base_bit_len(dim, 0);
+ for (size_t i = 0; i < k; ++i) {
+ for (int j = 0; j < dim; ++j) {
+ long long offset_value = medoids[i][j] - min_base[j];
+ offset_medoids[i][j] = offset_value;
+ max_base_bit_len[j] = std::max(max_base_bit_len[j], BitBufferUtils::bitsRequiredNoSign(offset_value));
+ }
+ }
+ BitBuffer base_bitstream;
+ for (const auto& offset_point : offset_medoids) for (int i = 0; i < dim; ++i) BitBufferUtils::appendToBitstream(base_bitstream, offset_point[i], max_base_bit_len[i]);
+ return {min_base, max_base_bit_len, base_bitstream};
+ }
+ inline BitBuffer generateBitstreamFrequency(const std::vector<long long>& cluster_size, int block_size) {
+ if (cluster_size.empty()) return BitBuffer();
+ BitBuffer freq_bit, freq_meta;
+ std::vector<long long> cluster_delta(cluster_size.size());
+ if (!cluster_size.empty()) {
+ cluster_delta[0] = cluster_size[0];
+ for (size_t i = 1; i < cluster_size.size(); ++i) cluster_delta[i] = cluster_size[i] - cluster_size[i - 1];
+ }
+ int total_length = cluster_size.size();
+ int num_blocks = (total_length + block_size - 1) / block_size;
+ for (int block_idx = 0; block_idx < num_blocks; ++block_idx) {
+ int start = block_idx * block_size;
+ int end = std::min(start + block_size, total_length);
+ long long max_frequency = 0;
+ for (int i = start; i < end; ++i) max_frequency = std::max(max_frequency, cluster_delta[i]);
+ int max_freq_bit = BitBufferUtils::bitsRequiredNoSign(max_frequency);
+ BitBufferUtils::appendToBitstream(freq_meta, max_freq_bit, 8);
+ for (int i = start; i < end; ++i) BitBufferUtils::appendToBitstream(freq_bit, cluster_delta[i], max_freq_bit);
+ }
+ freq_meta.merge(freq_bit);
+ return freq_meta;
+ }
+ struct EncodedDataResult { BitBuffer residual_bitstream; std::vector<std::vector<int>> pack_res_metadata; int pack_num; int total_data_points; };
+ inline EncodedDataResult generateBitstreamEncodedData(const std::vector<std::vector<long long>>& residual_series, int pack_size, int dim) {
+ int total_data_points = residual_series.size();
+ if (total_data_points == 0) return {BitBuffer(), {}, 0, 0};
+ int pack_num = (total_data_points + pack_size - 1) / pack_size;
+ BitBuffer residual_bit;
+ std::vector<std::vector<int>> pack_res_metadata(pack_num, std::vector<int>(dim));
+ int pack_index = 0;
+ for (int pack_start = 0; pack_start < total_data_points; pack_start += pack_size) {
+ int pack_end = std::min(pack_start + pack_size, total_data_points);
+ std::vector<int> max_residual_bits(dim, 0);
+ for (int i = pack_start; i < pack_end; ++i) for (int j = 0; j < dim; ++j) max_residual_bits[j] = std::max(max_residual_bits[j], BitBufferUtils::bitsRequiredNoSign(residual_series[i][j]));
+ pack_res_metadata[pack_index] = max_residual_bits;
+ for (int i = pack_start; i < pack_end; ++i) for (int j = 0; j < dim; ++j) BitBufferUtils::appendToBitstream(residual_bit, residual_series[i][j], max_residual_bits[j]);
+ pack_index++;
+ }
+ return {residual_bit, pack_res_metadata, pack_num, total_data_points};
+ }
+} // end anonymous namespace
+
+inline std::unique_ptr<PreprocessorResult> preprocessPageFromDoubles(const std::vector<double>& page_doubles, int& dim) {
+ dim = 1;
+ if (page_doubles.empty()) return nullptr;
+ int max_decimal_places = 0;
+ for(double val : page_doubles) {
+ std::string s = std::to_string(val);
+ auto dot_pos = s.find('.');
+ if (dot_pos != std::string::npos) {
+ s.erase(s.find_last_not_of('0') + 1, std::string::npos);
+ if (!s.empty() && s.back() == '.') s.pop_back();
+ max_decimal_places = std::max(max_decimal_places, (int)(s.length() - dot_pos - 1));
+ }
+ }
+ double scale = std::pow(10, max_decimal_places);
+ auto integerSoA = std::make_unique<SoAData<long long>>();
+ integerSoA->dim = 1;
+ integerSoA->num_rows = page_doubles.size();
+ integerSoA->columns.resize(1);
+ integerSoA->columns[0].reserve(page_doubles.size());
+ for(double val : page_doubles) {
+ integerSoA->columns[0].push_back(static_cast<long long>(val * scale));
+ }
+ long long min_val_long = 0;
+ if(!integerSoA->columns[0].empty()){
+ min_val_long = *std::min_element(integerSoA->columns[0].begin(), integerSoA->columns[0].end());
+ }
+ for (long long& val : integerSoA->columns[0]) {
+ val -= min_val_long;
+ }
+ return std::make_unique<PreprocessorResult>(PreprocessorResult{std::move(integerSoA), {max_decimal_places}, {min_val_long}});
+}
+
+} // namespace AClusterLogic
+
+namespace storage {
+
+// Base class for holding common data for ACluster encoders.
+template<typename BufferType>
+class AClusterEncoderData {
+protected:
+ AClusterEncoderData() {
+ page_size_ = common::g_config_value_.page_writer_max_point_num_;
+ points_buffer_.reserve(page_size_);
+ }
+
+ void reset_data() {
+ points_buffer_.clear();
+ }
+
+ std::vector<BufferType> points_buffer_;
+ uint32_t page_size_;
+};
+
+// ====================================================================
+// DoubleAClusterEncoder for FLOAT and DOUBLE
+// ====================================================================
+class DoubleAClusterEncoder : public Encoder, private AClusterEncoderData<double> {
+public:
+ DoubleAClusterEncoder() = default;
+ ~DoubleAClusterEncoder() override {}
+
+ void reset() override { reset_data(); }
+
+ int get_max_byte_size() override {
+ return page_size_ * sizeof(double) * 2;
+ }
+
+ // --- Overrides for supported types ---
+ int encode(double value, common::ByteStream &out_stream) override {
+ points_buffer_.push_back(value);
+ return common::E_OK;
+ }
+ int encode(float value, common::ByteStream &out_stream) override {
+ points_buffer_.push_back(static_cast<double>(value));
+ return common::E_OK;
+ }
+
+ // --- Override for flush ---
+ int flush(common::ByteStream &out_stream) override;
+
+ // --- Overrides for unsupported types ---
+ int encode(bool, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+ int encode(int32_t, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+ int encode(int64_t, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+ int encode(common::String, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+};
+
+// ====================================================================
+// IntAClusterEncoder for INT32 and INT64
+// ====================================================================
+class IntAClusterEncoder : public Encoder, private AClusterEncoderData<long long> {
+public:
+ IntAClusterEncoder() = default;
+ ~IntAClusterEncoder() override {}
+
+ void reset() override { reset_data(); }
+
+ int get_max_byte_size() override {
+ return page_size_ * sizeof(long long) * 2;
+ }
+
+ // --- Overrides for supported types ---
+ int encode(long long value, common::ByteStream &out_stream) override {
+ points_buffer_.push_back(value);
+ return common::E_OK;
+ }
+ int encode(int32_t value, common::ByteStream &out_stream) override {
+ points_buffer_.push_back(static_cast<long long>(value));
+ return common::E_OK;
+ }
+
+ // --- Override for flush ---
+ int flush(common::ByteStream &out_stream) override;
+
+ // --- Overrides for unsupported types ---
+ int encode(bool, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+ int encode(float, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+ int encode(double, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+ int encode(common::String, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+};
+
+
+// ====================================================================
+// Implementation of flush methods
+// ====================================================================
+
+// --- FLUSH IMPLEMENTATION FOR DoubleAClusterEncoder ---
+inline int DoubleAClusterEncoder::flush(common::ByteStream &out_stream) {
+ if (points_buffer_.empty()) {
+ return common::E_OK;
+ }
+ try {
+ int dim = 1;
+ int pack_size = 10;
+ int block_size = 10;
+
+ auto preproc_res_ptr = AClusterLogic::preprocessPageFromDoubles(points_buffer_, dim);
+ if (!preproc_res_ptr || !preproc_res_ptr->data) {
+ reset();
+ return common::E_OK;
+ }
+
+ // <<<<<<<<<<<< CORE LOGIC CHANGE >>>>>>>>>>>>
+ // Call ACluster's core clustering algorithm
+ AClusterLogic::ClusteringResult clustering_res = AClusterLogic::adaptiveGreedyBasePointSelection(*preproc_res_ptr->data, dim);
+
+ using namespace AClusterLogic;
+ int page_k = clustering_res.medoids.size();
+ BitBuffer fre_bitstream = generateBitstreamFrequency(clustering_res.cluster_sizes, block_size);
+ std::vector<std::vector<long long>> residuals = residualCalculationZigzag_sorted(*preproc_res_ptr->data, clustering_res.medoids, clustering_res.cluster_assignment, clustering_res.cluster_sizes, dim);
+
+ EncodedDataResult bitstream_res = generateBitstreamEncodedData(residuals, pack_size, dim);
+ BasePointsResult basepoint_res = generateBitstreamEncodedBasePointsOptimized(clustering_res.medoids, dim);
+
+ BitBuffer single_page_bitstream;
+ BitBufferUtils::appendToBitstream(single_page_bitstream, (long long)page_k, 16);
+ BitBufferUtils::appendToBitstream(single_page_bitstream, bitstream_res.total_data_points, 16);
+ for (int j = 0; j < dim; ++j) {
+ BitBufferUtils::appendToBitstream(single_page_bitstream, preproc_res_ptr->max_decimal_places[j], 8);
+ long long val = preproc_res_ptr->min_values[j]; long long abs_val = std::abs(val); int bit_len = BitBufferUtils::bitsRequiredNoSign(abs_val);
+ BitBufferUtils::appendToBitstream(single_page_bitstream, bit_len, 8); BitBufferUtils::appendToBitstream(single_page_bitstream, (val >= 0 ? 0 : 1), 1); BitBufferUtils::appendToBitstream(single_page_bitstream, abs_val, bit_len);
+ }
+ for (int j = 0; j < dim; ++j) {
+ long long val = basepoint_res.min_base[j]; long long abs_val = std::abs(val); int bit_len = BitBufferUtils::bitsRequiredNoSign(abs_val);
+ BitBufferUtils::appendToBitstream(single_page_bitstream, bit_len, 8); BitBufferUtils::appendToBitstream(single_page_bitstream, (val >= 0 ? 0 : 1), 1); BitBufferUtils::appendToBitstream(single_page_bitstream, abs_val, bit_len);
+ BitBufferUtils::appendToBitstream(single_page_bitstream, basepoint_res.max_base_bit_len[j], 8);
+ }
+ BitBufferUtils::appendToBitstream(single_page_bitstream, (long long)bitstream_res.pack_res_metadata.size(), 16);
+ for (const auto& pack : bitstream_res.pack_res_metadata) for (int val : pack) BitBufferUtils::appendToBitstream(single_page_bitstream, val, 8);
+ single_page_bitstream.merge(basepoint_res.base_bitstream);
+ single_page_bitstream.merge(fre_bitstream);
+ single_page_bitstream.merge(bitstream_res.residual_bitstream);
+
+ std::vector<uint8_t> compressed_bytes = single_page_bitstream.toByteArray();
+ uint32_t data_size = compressed_bytes.size();
+ out_stream.write_buf(reinterpret_cast<const char*>(&data_size), sizeof(data_size));
+ if (data_size > 0) {
+ out_stream.write_buf(reinterpret_cast<const char*>(compressed_bytes.data()), data_size);
+ }
+ } catch (const std::exception& e) {
+ reset();
+ return common::E_ENCODING_ERROR;
+ }
+ reset();
+ return common::E_OK;
+}
+
+// --- FLUSH IMPLEMENTATION FOR IntAClusterEncoder ---
+inline int IntAClusterEncoder::flush(common::ByteStream &out_stream) {
+ if (points_buffer_.empty()) {
+ return common::E_OK;
+ }
+ try {
+ int dim = 1;
+ int pack_size = 10;
+ int block_size = 10;
+
+ auto preproc_res_ptr = AClusterLogic::preprocessPageFromLongs(points_buffer_, dim);
+ if (!preproc_res_ptr || !preproc_res_ptr->data) {
+ reset();
+ return common::E_OK;
+ }
+
+ // <<<<<<<<<<<< CORE LOGIC CHANGE >>>>>>>>>>>>
+ // Call ACluster's core clustering algorithm
+ AClusterLogic::ClusteringResult clustering_res = AClusterLogic::adaptiveGreedyBasePointSelection(*preproc_res_ptr->data, dim);
+
+ using namespace AClusterLogic;
+ int page_k = clustering_res.medoids.size();
+ BitBuffer fre_bitstream = generateBitstreamFrequency(clustering_res.cluster_sizes, block_size);
+ std::vector<std::vector<long long>> residuals = residualCalculationZigzag_sorted(*preproc_res_ptr->data, clustering_res.medoids, clustering_res.cluster_assignment, clustering_res.cluster_sizes, dim);
+
+ EncodedDataResult bitstream_res = generateBitstreamEncodedData(residuals, pack_size, dim);
+ BasePointsResult basepoint_res = generateBitstreamEncodedBasePointsOptimized(clustering_res.medoids, dim);
+
+ BitBuffer single_page_bitstream;
+ BitBufferUtils::appendToBitstream(single_page_bitstream, (long long)page_k, 16);
+ BitBufferUtils::appendToBitstream(single_page_bitstream, bitstream_res.total_data_points, 16);
+ for (int j = 0; j < dim; ++j) {
+ BitBufferUtils::appendToBitstream(single_page_bitstream, preproc_res_ptr->max_decimal_places[j], 8);
+ long long val = preproc_res_ptr->min_values[j]; long long abs_val = std::abs(val); int bit_len = BitBufferUtils::bitsRequiredNoSign(abs_val);
+ BitBufferUtils::appendToBitstream(single_page_bitstream, bit_len, 8); BitBufferUtils::appendToBitstream(single_page_bitstream, (val >= 0 ? 0 : 1), 1); BitBufferUtils::appendToBitstream(single_page_bitstream, abs_val, bit_len);
+ }
+ for (int j = 0; j < dim; ++j) {
+ long long val = basepoint_res.min_base[j]; long long abs_val = std::abs(val); int bit_len = BitBufferUtils::bitsRequiredNoSign(abs_val);
+ BitBufferUtils::appendToBitstream(single_page_bitstream, bit_len, 8); BitBufferUtils::appendToBitstream(single_page_bitstream, (val >= 0 ? 0 : 1), 1); BitBufferUtils::appendToBitstream(single_page_bitstream, abs_val, bit_len);
+ BitBufferUtils::appendToBitstream(single_page_bitstream, basepoint_res.max_base_bit_len[j], 8);
+ }
+ BitBufferUtils::appendToBitstream(single_page_bitstream, (long long)bitstream_res.pack_res_metadata.size(), 16);
+ for (const auto& pack : bitstream_res.pack_res_metadata) for (int val : pack) BitBufferUtils::appendToBitstream(single_page_bitstream, val, 8);
+ single_page_bitstream.merge(basepoint_res.base_bitstream);
+ single_page_bitstream.merge(fre_bitstream);
+ single_page_bitstream.merge(bitstream_res.residual_bitstream);
+
+ std::vector<uint8_t> compressed_bytes = single_page_bitstream.toByteArray();
+ uint32_t data_size = compressed_bytes.size();
+ out_stream.write_buf(reinterpret_cast<const char*>(&data_size), sizeof(data_size));
+ if (data_size > 0) {
+ out_stream.write_buf(reinterpret_cast<const char*>(compressed_bytes.data()), data_size);
+ }
+ } catch (const std::exception& e) {
+ reset();
+ return common::E_ENCODING_ERROR;
+ }
+ reset();
+ return common::E_OK;
+}
+
+} // namespace storage
+
+#endif // ENCODING_ACLUSTER_ENCODER_H
\ No newline at end of file
diff --git a/cpp/src/encoding/decoder_factory.h b/cpp/src/encoding/decoder_factory.h
index 8918c92..0f1b838 100644
--- a/cpp/src/encoding/decoder_factory.h
+++ b/cpp/src/encoding/decoder_factory.h
@@ -24,6 +24,8 @@
#include "gorilla_decoder.h"
#include "plain_decoder.h"
#include "ts2diff_decoder.h"
+#include "kcluster_decoder.h"
+#include "acluster_decoder.h"
namespace storage {
@@ -82,6 +84,24 @@
} else {
ASSERT(false);
}
+ } else if (encoding == common::KCLUCSTER) { // Note the typo in your db_common.h
+ if (data_type == common::FLOAT || data_type == common::DOUBLE) {
+ ALLOC_AND_RETURN_DECODER(DoubleKClusterDecoder);
+ } else if (data_type == common::INT32 || data_type == common::INT64) {
+ ALLOC_AND_RETURN_DECODER(IntKClusterDecoder);
+ } else {
+ ASSERT(false);
+ return nullptr;
+ }
+ } else if (encoding == common::ACLUSTER) {
+ if (data_type == common::FLOAT || data_type == common::DOUBLE) {
+ ALLOC_AND_RETURN_DECODER(DoubleAClusterDecoder);
+ } else if (data_type == common::INT32 || data_type == common::INT64) {
+ ALLOC_AND_RETURN_DECODER(IntAClusterDecoder);
+ } else {
+ ASSERT(false);
+ return nullptr;
+ }
} else {
// not support now
ASSERT(false);
diff --git a/cpp/src/encoding/encoder_factory.h b/cpp/src/encoding/encoder_factory.h
index 0e582ae..b9f428d 100644
--- a/cpp/src/encoding/encoder_factory.h
+++ b/cpp/src/encoding/encoder_factory.h
@@ -25,6 +25,9 @@
#include "gorilla_encoder.h"
#include "plain_encoder.h"
#include "ts2diff_encoder.h"
+#include "kcluster_encoder.h"
+#include "acluster_encoder.h"
+
namespace storage {
@@ -111,6 +114,22 @@
return nullptr;
} else if (encoding == common::FREQ) {
return nullptr;
+ } else if (encoding == common::KCLUSTER) {
+ if (data_type == common::FLOAT || data_type == common::DOUBLE) {
+ ALLOC_AND_RETURN_ENCODER(DoubleKClusterEncoder);
+ } else if (data_type == common::INT32 || data_type == common::INT64) {
+ ALLOC_AND_RETURN_ENCODER(IntKClusterEncoder);
+ } else {
+ ASSERT(false);
+ }
+ } else if (encoding == common::ACLUSTER) {
+ if (data_type == common::FLOAT || data_type == common::DOUBLE) {
+ ALLOC_AND_RETURN_ENCODER(DoubleAClusterEncoder);
+ } else if (data_type == common::INT32 || data_type == common::INT64) {
+ ALLOC_AND_RETURN_ENCODER(IntAClusterEncoder);
+ } else {
+ ASSERT(false);
+ }
} else {
// not support now
ASSERT(false);
diff --git a/cpp/src/encoding/kcluster_decoder.h b/cpp/src/encoding/kcluster_decoder.h
new file mode 100644
index 0000000..07a0dc2
--- /dev/null
+++ b/cpp/src/encoding/kcluster_decoder.h
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ #ifndef ENCODING_KCLUSTER_DECODER_H
+#define ENCODING_KCLUSTER_DECODER_H
+
+#include <vector>
+#include <string>
+#include <memory>
+#include <cmath>
+#include <stdexcept>
+#include <limits>
+#include <cstdint>
+
+#include "decoder.h"
+#include "common/allocator/byte_stream.h"
+
+// ===================================================================
+// Part 1: Core KCluster Decoding Logic
+// ===================================================================
+namespace KClusterDecodeLogic {
+
+// Corresponds to: cluster_encoder_logic.cpp :: BitStreamReader
+class BitStreamReader {
+public:
+ explicit BitStreamReader(const std::vector<uint8_t>& data)
+ : buffer(data), bufferSize(data.size() * 8), currentBitPos(0) {}
+
+ inline long long readBits(int numBits) {
+ if (numBits < 0 || numBits > 64) throw std::invalid_argument("Cannot read " + std::to_string(numBits) + " bits.");
+ if (currentBitPos + numBits > bufferSize) throw std::out_of_range("Not enough bits in stream to read " + std::to_string(numBits) + " bits.");
+
+ long long value = 0;
+ for (int i = 0; i < numBits; ++i) {
+ size_t byteIndex = currentBitPos / 8;
+ int bitIndex = 7 - (currentBitPos % 8);
+ bool bit = (buffer[byteIndex] & (1 << bitIndex)) != 0;
+ value = (value << 1) | (bit ? 1LL : 0LL);
+ currentBitPos++;
+ }
+ return value;
+ }
+
+ inline bool has_more() const {
+ return currentBitPos < bufferSize;
+ }
+
+private:
+ const std::vector<uint8_t>& buffer;
+ const size_t bufferSize;
+ size_t currentBitPos;
+};
+
+// Corresponds to: cluster_encoder_logic.cpp :: zigzagDecode
+inline long long zigzagDecode(long long n) { return (n >> 1) ^ (-(n & 1)); }
+
+// Corresponds to: cluster_encoder_logic.cpp :: decode_multidim_impl
+inline std::vector<std::vector<double>> decode_multidim_impl(const std::vector<uint8_t>& compressed_data) {
+ if (compressed_data.empty()) return {};
+ BitStreamReader reader(compressed_data);
+
+ int dim = reader.readBits(8);
+ int pack_size = reader.readBits(16);
+ int block_size = reader.readBits(16);
+ int page_count = reader.readBits(32);
+ int page_size = reader.readBits(16);
+ (void)pack_size; (void)page_size; // Suppress unused variable warnings
+
+ std::vector<double> pow10_lookup; for (int i = 0; i < 20; ++i) pow10_lookup.push_back(std::pow(10, i));
+ std::vector<std::vector<double>> all_data_rows;
+
+ for (int i = 0; i < page_count; ++i) {
+ int k = reader.readBits(16);
+ int page_data_points = reader.readBits(16);
+
+ std::vector<int> max_decimals(dim);
+ std::vector<long long> min_values(dim);
+ for (int j = 0; j < dim; ++j) {
+ max_decimals[j] = reader.readBits(8);
+ int bit_len = reader.readBits(8);
+ long long sign = reader.readBits(1);
+ long long abs_val = reader.readBits(bit_len);
+ min_values[j] = (sign == 0) ? abs_val : -abs_val;
+ }
+
+ std::vector<long long> min_bases(dim);
+ std::vector<int> max_base_bit_len(dim);
+ for (int j = 0; j < dim; ++j) {
+ int bit_len = reader.readBits(8);
+ long long sign = reader.readBits(1);
+ long long abs_val = reader.readBits(bit_len);
+ min_bases[j] = (sign == 0) ? abs_val : -abs_val;
+ max_base_bit_len[j] = reader.readBits(8);
+ }
+
+ int pack_num = reader.readBits(16);
+ std::vector<std::vector<int>> pack_metadata(pack_num, std::vector<int>(dim));
+ for (int p = 0; p < pack_num; ++p) for (int j = 0; j < dim; ++j) pack_metadata[p][j] = reader.readBits(8);
+
+ std::vector<std::vector<long long>> medoids_long(k, std::vector<long long>(dim));
+ for (int m = 0; m < k; ++m) for (int j = 0; j < dim; ++j) medoids_long[m][j] = reader.readBits(max_base_bit_len[j]) + min_bases[j];
+
+ std::vector<long long> cluster_sizes(k, 0);
+ if (k > 0) {
+ int num_freq_blocks = (k + block_size - 1) / block_size;
+ std::vector<int> max_bits_per_block(num_freq_blocks);
+ for (int b = 0; b < num_freq_blocks; ++b) max_bits_per_block[b] = reader.readBits(8);
+ std::vector<long long> deltas(k);
+ int freq_idx = 0;
+ for (int b = 0; b < num_freq_blocks && freq_idx < k; ++b) {
+ int max_bit_for_this_block = max_bits_per_block[b];
+ int end = std::min((b + 1) * block_size, k);
+ for (int j = freq_idx; j < end; ++j) deltas[j] = reader.readBits(max_bit_for_this_block);
+ freq_idx = end;
+ }
+ cluster_sizes[0] = deltas[0];
+ for (size_t j = 1; j < deltas.size(); ++j) cluster_sizes[j] = cluster_sizes[j - 1] + deltas[j];
+ }
+
+ std::vector<std::vector<long long>> residual_series(page_data_points, std::vector<long long>(dim));
+ int data_counter = 0;
+ if (page_data_points > 0) {
+ for (int p = 0; p < pack_num; ++p) {
+ const auto& bits = pack_metadata[p];
+ int points_in_pack = (p == pack_num - 1) ? (page_data_points - (p * pack_size)) : pack_size;
+ for (int pt = 0; pt < points_in_pack; ++pt) {
+ if (data_counter >= page_data_points) break;
+ for (int j = 0; j < dim; ++j) residual_series[data_counter][j] = zigzagDecode(reader.readBits(bits[j]));
+ data_counter++;
+ }
+ }
+ }
+
+ if (page_data_points > 0 && k > 0) {
+ int current_point_idx = 0;
+ for (int medoid_idx = 0; medoid_idx < k; ++medoid_idx) {
+ long long points_in_this_cluster = cluster_sizes[medoid_idx];
+ const auto& base_point = medoids_long[medoid_idx];
+ for (int p_count = 0; p_count < points_in_this_cluster; ++p_count) {
+ if (current_point_idx < page_data_points) {
+ std::vector<double> row(dim);
+ for (int j = 0; j < dim; ++j) {
+ long long int_val = base_point[j] + residual_series[current_point_idx][j] + min_values[j];
+ row[j] = (max_decimals[j] > 0) ? (static_cast<double>(int_val) / pow10_lookup[max_decimals[j]]) : static_cast<double>(int_val);
+ }
+ all_data_rows.push_back(row);
+ current_point_idx++;
+ }
+ }
+ }
+ } else if (page_data_points > 0) {
+ for (int d = 0; d < page_data_points; ++d) {
+ std::vector<double> row(dim);
+ for (int j = 0; j < dim; ++j) {
+ long long int_val = residual_series[d][j] + min_values[j];
+ row[j] = (max_decimals[j] > 0) ? (static_cast<double>(int_val) / pow10_lookup[max_decimals[j]]) : static_cast<double>(int_val);
+ }
+ all_data_rows.push_back(row);
+ }
+ }
+ }
+ return all_data_rows;
+}
+
+} // namespace KClusterDecodeLogic
+
+
+// ===================================================================
+// Part 2: The Decoder classes that adapt the logic for TsFile
+// ===================================================================
+namespace storage {
+
+template<typename T>
+class KClusterDecoderBase : public Decoder {
+public:
+ KClusterDecoderBase() : current_read_index_(0), decoded_(false) {}
+ ~KClusterDecoderBase() override {}
+
+ int read_boolean(bool&, common::ByteStream&) override { return common::E_NOT_SUPPORT; }
+ int read_int32(int32_t&, common::ByteStream&) override { return common::E_NOT_SUPPORT; }
+ int read_int64(int64_t&, common::ByteStream&) override { return common::E_NOT_SUPPORT; }
+ int read_float(float&, common::ByteStream&) override { return common::E_NOT_SUPPORT; }
+ int read_double(double&, common::ByteStream&) override { return common::E_NOT_SUPPORT; }
+ int read_String(common::String&, common::PageArena&, common::ByteStream&) override { return common::E_NOT_SUPPORT; }
+
+protected:
+ void decode_page_if_needed(common::ByteStream &in);
+ std::vector<T> decoded_points_;
+ size_t current_read_index_;
+ bool decoded_;
+};
+
+class DoubleKClusterDecoder : public KClusterDecoderBase<double> {
+public:
+ int read_double(double& val, common::ByteStream& in) override;
+ int read_float(float& val, common::ByteStream& in) override;
+};
+
+class IntKClusterDecoder : public KClusterDecoderBase<long long> {
+public:
+ int read_int64(long long& val, common::ByteStream& in) override;
+ int read_int32(int32_t& val, common::ByteStream& in) override;
+};
+
+
+// --- Implementation ---
+template<typename T>
+inline void KClusterDecoderBase<T>::decode_page_if_needed(common::ByteStream &in) {
+ if (decoded_ || !in.has_remaining()) return;
+
+ uint32_t data_size = 0;
+ uint32_t read_len = 0;
+ in.read_buf(reinterpret_cast<char*>(&data_size), sizeof(data_size), read_len);
+ if (read_len != sizeof(data_size)) throw std::runtime_error("Failed to read KCluster data block size");
+ if (data_size == 0) {
+ decoded_ = true;
+ return;
+ }
+
+ std::vector<uint8_t> compressed_block(data_size);
+ in.read_buf(reinterpret_cast<char*>(compressed_block.data()), data_size, read_len);
+ if (read_len != data_size) throw std::runtime_error("Failed to read KCluster data block");
+
+ KClusterLogic::BitBuffer fake_bytestream;
+ int dim = 1, pack_size = 10, block_size = 10, page_count = 1, page_size = 0;
+ KClusterLogic::BitBufferUtils::appendToBitstream(fake_bytestream, dim, 8);
+ KClusterLogic::BitBufferUtils::appendToBitstream(fake_bytestream, pack_size, 16);
+ KClusterLogic::BitBufferUtils::appendToBitstream(fake_bytestream, block_size, 16);
+ KClusterLogic::BitBufferUtils::appendToBitstream(fake_bytestream, page_count, 32);
+ KClusterLogic::BitBufferUtils::appendToBitstream(fake_bytestream, page_size, 16);
+
+ std::vector<uint8_t> final_bytes_to_decode = fake_bytestream.toByteArray();
+ final_bytes_to_decode.insert(final_bytes_to_decode.end(), compressed_block.begin(), compressed_block.end());
+
+ std::vector<std::vector<double>> result_multidim = KClusterDecodeLogic::decode_multidim_impl(final_bytes_to_decode);
+
+ if (!result_multidim.empty()) {
+ decoded_points_.reserve(result_multidim.size());
+ for(const auto& row : result_multidim) {
+ if (!row.empty()) {
+ decoded_points_.push_back(static_cast<T>(row[0]));
+ }
+ }
+ }
+ decoded_ = true;
+}
+
+inline int DoubleKClusterDecoder::read_double(double& val, common::ByteStream& in) {
+ try { decode_page_if_needed(in); } catch(const std::exception& e) { return common::E_DECODING_ERROR; }
+ if (current_read_index_ >= decoded_points_.size()) return common::E_NO_MORE_DATA;
+ val = decoded_points_[current_read_index_++];
+ return common::E_OK;
+}
+inline int DoubleKClusterDecoder::read_float(float& val, common::ByteStream& in) {
+ double d_val;
+ int ret = read_double(d_val, in);
+ if (ret == common::E_OK) val = static_cast<float>(d_val);
+ return ret;
+}
+
+inline int IntKClusterDecoder::read_int64(long long& val, common::ByteStream& in) {
+ try { decode_page_if_needed(in); } catch(const std::exception& e) { return common::E_DECODING_ERROR; }
+ if (current_read_index_ >= decoded_points_.size()) return common::E_NO_MORE_DATA;
+ val = decoded_points_[current_read_index_++];
+ return common::E_OK;
+}
+inline int IntKClusterDecoder::read_int32(int32_t& val, common::ByteStream& in) {
+ long long ll_val;
+ int ret = read_int64(ll_val, in);
+ if (ret == common::E_OK) {
+ if (ll_val < std::numeric_limits<int32_t>::min() || ll_val > std::numeric_limits<int32_t>::max()) {
+ return common::E_DECODING_ERROR;
+ }
+ val = static_cast<int32_t>(ll_val);
+ }
+ return ret;
+}
+
+} // namespace storage
+#endif // ENCODING_KCLUSTER_DECODER_H
\ No newline at end of file
diff --git a/cpp/src/encoding/kcluster_encoder.h b/cpp/src/encoding/kcluster_encoder.h
new file mode 100644
index 0000000..0da8f41
--- /dev/null
+++ b/cpp/src/encoding/kcluster_encoder.h
@@ -0,0 +1,834 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+#ifndef ENCODING_KCLUSTER_ENCODER_H
+#define ENCODING_KCLUSTER_ENCODER_H
+
+#include <vector>
+#include <string>
+#include <memory>
+#include <cmath>
+#include <numeric>
+#include <algorithm>
+#include <stdexcept>
+#include <limits>
+#include <unordered_set>
+#include <random>
+#include <sstream>
+#include <functional>
+
+#include "encoder.h"
+#include "common/allocator/byte_stream.h"
+#include "common/global.h"
+
+// ===================================================================
+// Part 1: Core KCluster Algorithm Logic
+// All implementations are encapsulated in the KClusterLogic namespace
+// ===================================================================
+namespace KClusterLogic {
+
+// Corresponds to: algorithms/cluster_common.h
+template<typename T>
+struct SoAData {
+ std::vector<std::vector<T>> columns;
+ int dim = 0;
+ int num_rows = 0;
+};
+
+// Corresponds to: algorithms/cluster_common.h
+struct ClusteringResult {
+ std::vector<std::vector<long long>> medoids;
+ std::vector<int> cluster_assignment;
+ std::vector<long long> cluster_sizes;
+};
+
+// Corresponds to: algorithms/cluster_common.h
+struct PreprocessorResult {
+ std::unique_ptr<SoAData<long long>> data;
+ std::vector<int> max_decimal_places;
+ std::vector<long long> min_values;
+};
+
+// Corresponds to: cluster_encoder_logic.cpp
+struct BitBuffer {
+ std::vector<uint8_t> buffer;
+ size_t currentBitPos = 0;
+
+ inline void appendBit(bool bit) {
+ size_t byteIndex = currentBitPos / 8;
+ int bitIndexInByte = currentBitPos % 8;
+ if (byteIndex >= buffer.size()) buffer.push_back(0);
+ if (bit) buffer[byteIndex] |= (1 << (7 - bitIndexInByte));
+ currentBitPos++;
+ }
+
+ inline void merge(const BitBuffer& other) {
+ for (size_t i = 0; i < other.currentBitPos; ++i) {
+ size_t byteIndex = i / 8;
+ int bitOffset = 7 - (i % 8);
+ bool bit = (other.buffer[byteIndex] & (1 << bitOffset)) != 0;
+ appendBit(bit);
+ }
+ }
+
+ inline std::vector<uint8_t> toByteArray() const { return buffer; }
+ inline size_t size() const { return currentBitPos; }
+};
+
+namespace BitBufferUtils {
+ // Corresponds to: cluster_encoder_logic.cpp :: BitBufferUtils
+ inline void appendToBitstream(BitBuffer& bitBuffer, long long num, int bits) {
+ for (int i = bits - 1; i >= 0; --i) bitBuffer.appendBit((num >> i) & 1);
+ }
+ // Corresponds to: cluster_encoder_logic.cpp :: BitBufferUtils
+ inline int bitsRequiredNoSign(long long value) {
+ if (value == 0) return 1;
+ unsigned long long u_value = (value > 0) ? value : -value;
+ #if defined(__GNUC__) || defined(__clang__)
+ return 64 - __builtin_clzll(u_value);
+ #else
+ int length = 0; while(u_value > 0){ u_value >>= 1; length++; } return length == 0 ? 1 : length;
+ #endif
+ }
+}
+
+namespace { // Anonymous namespace for internal linkage helpers
+ // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for Preprocessor
+ inline int bitLength(long long value) {
+ if (value == 0) return 1;
+ unsigned long long u_value = (value > 0) ? value : -value;
+ #if defined(__GNUC__) || defined(__clang__)
+ return 64 - __builtin_clzll(u_value);
+ #else
+ int length = 0; while (u_value > 0) { u_value >>= 1; length++; } return length;
+ #endif
+ }
+ // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for clustering
+ inline long long calculateTotalLogCost(const std::vector<long long>& p1, const std::vector<long long>& p2, int dim) {
+ long long totalCost = 0;
+ for (int i = 0; i < dim; ++i) {
+ long long residual = p1[i] - p2[i];
+ totalCost += bitLength(residual >= 0 ? residual : -residual) + 1;
+ }
+ return totalCost;
+ }
+ // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for clustering
+ inline long long manhattanDistance(const std::vector<long long>& p1, const std::vector<long long>& p2) {
+ long long sum = 0;
+ for (size_t i = 0; i < p1.size(); ++i) { sum += std::abs(p1[i] - p2[i]); }
+ return sum;
+ }
+ // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for clustering
+ inline std::vector<long long> get_point_from_soa(const SoAData<long long>& soa_data, int row_idx) {
+ std::vector<long long> point(soa_data.dim);
+ for (int d = 0; d < soa_data.dim; ++d) {
+ point[d] = soa_data.columns[d][row_idx];
+ }
+ return point;
+ }
+ // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for clustering
+ inline std::vector<std::vector<long long>> convert_SoA_to_AoS(const SoAData<long long>& soa_data) {
+ if (soa_data.num_rows == 0) return {};
+ std::vector<std::vector<long long>> aos_data(soa_data.num_rows, std::vector<long long>(soa_data.dim));
+ for (int j = 0; j < soa_data.dim; ++j) {
+ for (int i = 0; i < soa_data.num_rows; ++i) {
+ aos_data[i][j] = soa_data.columns[j][i];
+ }
+ }
+ return aos_data;
+ }
+ // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for clustering
+ struct VectorHasher {
+ inline std::size_t operator()(const std::vector<long long>& vec) const {
+ std::size_t seed = vec.size();
+ for(long long i : vec) { seed ^= std::hash<long long>()(i) + 0x9e3779b9 + (seed << 6) + (seed >> 2); }
+ return seed;
+ }
+ };
+ // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for clustering
+ struct MedSortHelper {
+ std::vector<long long> medoid;
+ long long size;
+ int original_index;
+ inline bool operator<(const MedSortHelper& other) const { return size < other.size; }
+ };
+ // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for clustering
+ inline ClusteringResult sortResults(std::vector<std::vector<long long>>& medoids, std::vector<int>& assignment, std::vector<long long>& sizes) {
+ int k = medoids.size();
+ if (k == 0) return {{}, {}, {}};
+ std::vector<MedSortHelper> sorters;
+ sorters.reserve(k);
+ for(int i = 0; i < k; ++i) { sorters.push_back({medoids[i], sizes[i], i}); }
+ std::sort(sorters.begin(), sorters.end());
+ ClusteringResult res;
+ res.medoids.resize(k);
+ res.cluster_sizes.resize(k);
+ std::vector<int> old_to_new_map(k);
+ for(int i = 0; i < k; ++i) {
+ res.medoids[i] = sorters[i].medoid;
+ res.cluster_sizes[i] = sorters[i].size;
+ old_to_new_map[sorters[i].original_index] = i;
+ }
+ res.cluster_assignment.resize(assignment.size());
+ for(size_t i = 0; i < assignment.size(); ++i) {
+ if (assignment[i] != -1 && static_cast<size_t>(assignment[i]) < old_to_new_map.size()) {
+ res.cluster_assignment[i] = old_to_new_map[assignment[i]];
+ } else { res.cluster_assignment[i] = -1; }
+ }
+ return res;
+ }
+ // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for encoding
+ inline long long zigzagEncode_scalar(long long n) {
+ return (n << 1) ^ (n >> 63);
+ }
+} // end anonymous namespace
+
+
+// Corresponds to: cluster_encoder_logic.cpp :: acceleratedInitialization
+inline std::vector<std::vector<long long>> acceleratedInitialization(const SoAData<long long>& data, int k, std::mt19937& gen) {
+ std::vector<std::vector<long long>> medoids;
+ if (data.num_rows == 0 || k <= 0) return medoids;
+ medoids.reserve(k);
+ std::unordered_set<std::vector<long long>, VectorHasher> selected_medoids_set;
+ std::uniform_int_distribution<> distrib(0, data.num_rows - 1);
+ int first_index = distrib(gen);
+ auto first_medoid = get_point_from_soa(data, first_index);
+ medoids.push_back(first_medoid);
+ selected_medoids_set.insert(first_medoid);
+ std::vector<long long> distances(data.num_rows);
+ for (int i = 0; i < data.num_rows; ++i) {
+ distances[i] = manhattanDistance(get_point_from_soa(data, i), medoids[0]);
+ }
+ for (int i = 1; i < k; ++i) {
+ if (selected_medoids_set.size() >= (size_t)data.num_rows) break;
+ long long total_distance = std::accumulate(distances.begin(), distances.end(), 0LL);
+ if (total_distance == 0) {
+ int next_idx = 0;
+ while(selected_medoids_set.count(get_point_from_soa(data, next_idx))) {
+ next_idx = (next_idx + 1) % data.num_rows;
+ }
+ auto new_medoid = get_point_from_soa(data, next_idx);
+ medoids.push_back(new_medoid);
+ selected_medoids_set.insert(new_medoid);
+ continue;
+ }
+ std::uniform_real_distribution<double> dist_double(0.0, (double)total_distance);
+ long long rand_val = static_cast<long long>(dist_double(gen));
+ long long current_sum = 0;
+ int chosen_idx = -1;
+ for(int j = 0; j < data.num_rows; ++j) {
+ current_sum += distances[j];
+ if (current_sum >= rand_val) {
+ chosen_idx = j;
+ break;
+ }
+ }
+ if (chosen_idx == -1) chosen_idx = data.num_rows - 1;
+ while(selected_medoids_set.count(get_point_from_soa(data, chosen_idx))) {
+ chosen_idx = (chosen_idx + 1) % data.num_rows;
+ }
+ auto new_medoid = get_point_from_soa(data, chosen_idx);
+ medoids.push_back(new_medoid);
+ selected_medoids_set.insert(new_medoid);
+ for (int idx = 0; idx < data.num_rows; ++idx) {
+ long long dist_new = manhattanDistance(get_point_from_soa(data, idx), new_medoid);
+ distances[idx] = std::min(distances[idx], dist_new);
+ }
+ }
+ return medoids;
+}
+
+// Corresponds to: cluster_encoder_logic.cpp :: updateMedoids
+inline std::vector<std::vector<long long>> updateMedoids(const SoAData<long long>& data, const std::vector<int>& clusters, const std::vector<std::vector<long long>>& currentMedoids, int dim) {
+ int k = currentMedoids.size();
+ if (k == 0) return {};
+ std::vector<std::vector<long long>> newMedoids(k, std::vector<long long>(dim));
+ std::vector<std::vector<int>> cluster_point_indices(k);
+ for (int i = 0; i < data.num_rows; ++i) {
+ if (clusters[i] != -1 && clusters[i] < k) { cluster_point_indices[clusters[i]].push_back(i); }
+ }
+ for (int medoidIndex = 0; medoidIndex < k; ++medoidIndex) {
+ const auto& member_indices = cluster_point_indices[medoidIndex];
+ if (member_indices.empty()) { newMedoids[medoidIndex] = currentMedoids[medoidIndex]; continue; }
+ long long best_total_cost = std::numeric_limits<long long>::max();
+ int best_medoid_candidate_idx_in_data = -1;
+ for (int candidate_idx_in_data : member_indices) {
+ long long current_total_cost = 0;
+ auto candidate_medoid = get_point_from_soa(data, candidate_idx_in_data);
+ for (int member_idx_in_data : member_indices) {
+ if (candidate_idx_in_data == member_idx_in_data) continue;
+ auto other_point = get_point_from_soa(data, member_idx_in_data);
+ for(int d = 0; d < dim; ++d) {
+ long long residual = candidate_medoid[d] - other_point[d];
+ current_total_cost += bitLength(residual >= 0 ? residual : -residual) + 1;
+ }
+ }
+ if (current_total_cost < best_total_cost) {
+ best_total_cost = current_total_cost;
+ best_medoid_candidate_idx_in_data = candidate_idx_in_data;
+ }
+ }
+ if (best_medoid_candidate_idx_in_data != -1) {
+ newMedoids[medoidIndex] = get_point_from_soa(data, best_medoid_candidate_idx_in_data);
+ } else { newMedoids[medoidIndex] = currentMedoids[medoidIndex]; }
+ }
+ return newMedoids;
+}
+
+// Corresponds to: cluster_encoder_logic.cpp :: kMedoidLogCost
+inline ClusteringResult kMedoidLogCost(const SoAData<long long>& data, int k, int max_iter, double tol, int dim) {
+ int n = data.num_rows;
+ if (n == 0) return {{}, {}, {}};
+ auto aos_data = convert_SoA_to_AoS(data);
+ std::sort(aos_data.begin(), aos_data.end());
+ auto last = std::unique(aos_data.begin(), aos_data.end());
+ int distinctCount = std::distance(aos_data.begin(), last);
+ if (distinctCount < k) k = distinctCount;
+
+ std::random_device rd; std::mt19937 gen(rd());
+ auto medoids = acceleratedInitialization(data, k, gen);
+ k = medoids.size();
+ if (k==0) return {};
+
+ std::vector<int> cluster_assignment(n);
+ long long previous_total_cost = std::numeric_limits<long long>::max();
+
+ for (int iter = 0; iter < max_iter; ++iter) {
+ long long total_cost_this_round = 0;
+ for (int i = 0; i < n; ++i) {
+ long long min_cost = std::numeric_limits<long long>::max();
+ int assigned_medoid_idx = -1;
+ auto current_point = get_point_from_soa(data, i);
+ for (int m = 0; m < k; ++m) {
+ long long cost = calculateTotalLogCost(current_point, medoids[m], dim);
+ if (cost < min_cost) {
+ min_cost = cost;
+ assigned_medoid_idx = m;
+ } else if (cost == min_cost) {
+ if (medoids[m] == current_point) {
+ assigned_medoid_idx = m;
+ break;
+ }
+ }
+ }
+ cluster_assignment[i] = assigned_medoid_idx;
+ total_cost_this_round += min_cost;
+ }
+ if (iter > 0 && std::abs((double)previous_total_cost - total_cost_this_round) < tol) break;
+ previous_total_cost = total_cost_this_round;
+ auto new_medoids = updateMedoids(data, cluster_assignment, medoids, dim);
+ if (medoids == new_medoids) break;
+ medoids = new_medoids;
+ }
+ std::vector<long long> cluster_sizes(k, 0);
+ for (int assignment : cluster_assignment) { if (assignment != -1 && (size_t)assignment < cluster_sizes.size()) cluster_sizes[assignment]++; }
+ return sortResults(medoids, cluster_assignment, cluster_sizes);
+}
+
+// Corresponds to: cluster_encoder_logic.cpp :: residualCalculationZigzag_sorted
+inline std::vector<std::vector<long long>> residualCalculationZigzag_sorted(
+ const SoAData<long long>& data, const std::vector<std::vector<long long>>& medoids,
+ const std::vector<int>& assignments, const std::vector<long long>& sorted_cluster_sizes, int dim)
+{
+ int n = data.num_rows;
+ size_t k = medoids.size();
+ if (n == 0) return {};
+ std::vector<int> writePointers(k);
+ int cumulativeCount = 0;
+ for (size_t i = 0; i < k; ++i) {
+ writePointers[i] = cumulativeCount;
+ cumulativeCount += static_cast<int>(sorted_cluster_sizes[i]);
+ }
+ std::vector<std::vector<long long>> sortedResidualSeries(n, std::vector<long long>(dim));
+ for (int i = 0; i < n; i++) {
+ int clusterId = assignments[i];
+ if (clusterId < 0 || static_cast<size_t>(clusterId) >= k) continue;
+ const auto& base = medoids[clusterId];
+ int targetIndex = writePointers[clusterId];
+ for (int j = 0; j < dim; j++) {
+ long long residual = data.columns[j][i] - base[j];
+ sortedResidualSeries[targetIndex][j] = zigzagEncode_scalar(residual);
+ }
+ writePointers[clusterId]++;
+ }
+ return sortedResidualSeries;
+}
+
+namespace {
+ // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for encoding
+ struct BasePointsResult { std::vector<long long> min_base; std::vector<int> max_base_bit_len; BitBuffer base_bitstream; };
+ inline BasePointsResult generateBitstreamEncodedBasePointsOptimized(const std::vector<std::vector<long long>>& medoids, int dim) {
+ if (medoids.empty()) return {std::vector<long long>(dim, 0), std::vector<int>(dim, 0), BitBuffer()};
+ size_t k = medoids.size();
+ std::vector<long long> min_base(dim, std::numeric_limits<long long>::max());
+ for (const auto& point : medoids) for (int i = 0; i < dim; ++i) min_base[i] = std::min(min_base[i], point[i]);
+ std::vector<std::vector<long long>> offset_medoids(k, std::vector<long long>(dim));
+ std::vector<int> max_base_bit_len(dim, 0);
+ for (size_t i = 0; i < k; ++i) {
+ for (int j = 0; j < dim; ++j) {
+ long long offset_value = medoids[i][j] - min_base[j];
+ offset_medoids[i][j] = offset_value;
+ max_base_bit_len[j] = std::max(max_base_bit_len[j], BitBufferUtils::bitsRequiredNoSign(offset_value));
+ }
+ }
+ BitBuffer base_bitstream;
+ for (const auto& offset_point : offset_medoids) for (int i = 0; i < dim; ++i) BitBufferUtils::appendToBitstream(base_bitstream, offset_point[i], max_base_bit_len[i]);
+ return {min_base, max_base_bit_len, base_bitstream};
+ }
+ // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for encoding
+ inline BitBuffer generateBitstreamFrequency(const std::vector<long long>& cluster_size, int block_size) {
+ if (cluster_size.empty()) return BitBuffer();
+ BitBuffer freq_bit, freq_meta;
+ std::vector<long long> cluster_delta(cluster_size.size());
+ if (!cluster_size.empty()) {
+ cluster_delta[0] = cluster_size[0];
+ for (size_t i = 1; i < cluster_size.size(); ++i) cluster_delta[i] = cluster_size[i] - cluster_size[i - 1];
+ }
+ int total_length = cluster_size.size();
+ int num_blocks = (total_length + block_size - 1) / block_size;
+ for (int block_idx = 0; block_idx < num_blocks; ++block_idx) {
+ int start = block_idx * block_size;
+ int end = std::min(start + block_size, total_length);
+ long long max_frequency = 0;
+ for (int i = start; i < end; ++i) max_frequency = std::max(max_frequency, cluster_delta[i]);
+ int max_freq_bit = BitBufferUtils::bitsRequiredNoSign(max_frequency);
+ BitBufferUtils::appendToBitstream(freq_meta, max_freq_bit, 8);
+ for (int i = start; i < end; ++i) BitBufferUtils::appendToBitstream(freq_bit, cluster_delta[i], max_freq_bit);
+ }
+ freq_meta.merge(freq_bit);
+ return freq_meta;
+ }
+ // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for encoding
+ struct EncodedDataResult { BitBuffer residual_bitstream; std::vector<std::vector<int>> pack_res_metadata; int pack_num; int total_data_points; };
+ inline EncodedDataResult generateBitstreamEncodedData(const std::vector<std::vector<long long>>& residual_series, int pack_size, int dim) {
+ int total_data_points = residual_series.size();
+ if (total_data_points == 0) return {BitBuffer(), {}, 0, 0};
+ int pack_num = (total_data_points + pack_size - 1) / pack_size;
+ BitBuffer residual_bit;
+ std::vector<std::vector<int>> pack_res_metadata(pack_num, std::vector<int>(dim));
+ int pack_index = 0;
+ for (int pack_start = 0; pack_start < total_data_points; pack_start += pack_size) {
+ int pack_end = std::min(pack_start + pack_size, total_data_points);
+ std::vector<int> max_residual_bits(dim, 0);
+ for (int i = pack_start; i < pack_end; ++i) for (int j = 0; j < dim; ++j) max_residual_bits[j] = std::max(max_residual_bits[j], BitBufferUtils::bitsRequiredNoSign(residual_series[i][j]));
+ pack_res_metadata[pack_index] = max_residual_bits;
+ for (int i = pack_start; i < pack_end; ++i) for (int j = 0; j < dim; ++j) BitBufferUtils::appendToBitstream(residual_bit, residual_series[i][j], max_residual_bits[j]);
+ pack_index++;
+ }
+ return {residual_bit, pack_res_metadata, pack_num, total_data_points};
+ }
+} // end anonymous namespace
+
+inline std::unique_ptr<PreprocessorResult> preprocessPageFromDoubles(const std::vector<double>& page_doubles, int& dim) {
+ dim = 1;
+ if (page_doubles.empty()) return nullptr;
+ int max_decimal_places = 0;
+ for(double val : page_doubles) {
+ std::string s = std::to_string(val);
+ auto dot_pos = s.find('.');
+ if (dot_pos != std::string::npos) {
+ s.erase(s.find_last_not_of('0') + 1, std::string::npos);
+ if (!s.empty() && s.back() == '.') s.pop_back();
+ max_decimal_places = std::max(max_decimal_places, (int)(s.length() - dot_pos - 1));
+ }
+ }
+ double scale = std::pow(10, max_decimal_places);
+ auto integerSoA = std::make_unique<SoAData<long long>>();
+ integerSoA->dim = 1;
+ integerSoA->num_rows = page_doubles.size();
+ integerSoA->columns.resize(1);
+ integerSoA->columns[0].reserve(page_doubles.size());
+ for(double val : page_doubles) {
+ integerSoA->columns[0].push_back(static_cast<long long>(val * scale));
+ }
+ long long min_val_long = 0;
+ if(!integerSoA->columns[0].empty()){
+ min_val_long = *std::min_element(integerSoA->columns[0].begin(), integerSoA->columns[0].end());
+ }
+ for (long long& val : integerSoA->columns[0]) {
+ val -= min_val_long;
+ }
+ return std::make_unique<PreprocessorResult>(PreprocessorResult{std::move(integerSoA), {max_decimal_places}, {min_val_long}});
+}
+
+inline std::unique_ptr<PreprocessorResult> preprocessPageFromLongs(const std::vector<long long>& page_longs, int& dim) {
+ dim = 1;
+ if (page_longs.empty()) return nullptr;
+
+ auto integerSoA = std::make_unique<SoAData<long long>>();
+ integerSoA->dim = 1;
+ integerSoA->num_rows = page_longs.size();
+ integerSoA->columns.resize(1);
+ integerSoA->columns[0] = page_longs; // Directly copy the data
+
+ long long min_val_long = 0;
+ if(!integerSoA->columns[0].empty()){
+ min_val_long = *std::min_element(integerSoA->columns[0].begin(), integerSoA->columns[0].end());
+ }
+ for (long long& val : integerSoA->columns[0]) {
+ val -= min_val_long;
+ }
+
+ // For INT types, decimal places is 0.
+ return std::make_unique<PreprocessorResult>(PreprocessorResult{std::move(integerSoA), {0}, {min_val_long}});
+}
+
+inline std::unique_ptr<PreprocessorResult> preprocessPageFromDoubles(const std::vector<double>& page_doubles, int& dim) {
+ dim = 1;
+ if (page_doubles.empty()) return nullptr;
+ int max_decimal_places = 0;
+ for(double val : page_doubles) {
+ std::string s = std::to_string(val);
+ auto dot_pos = s.find('.');
+ if (dot_pos != std::string::npos) {
+ s.erase(s.find_last_not_of('0') + 1, std::string::npos);
+ if (!s.empty() && s.back() == '.') s.pop_back();
+ max_decimal_places = std::max(max_decimal_places, (int)(s.length() - dot_pos - 1));
+ }
+ }
+ double scale = std::pow(10, max_decimal_places);
+ auto integerSoA = std::make_unique<SoAData<long long>>();
+ integerSoA->dim = 1;
+ integerSoA->num_rows = page_doubles.size();
+ integerSoA->columns.resize(1);
+ integerSoA->columns[0].reserve(page_doubles.size());
+ for(double val : page_doubles) {
+ integerSoA->columns[0].push_back(static_cast<long long>(val * scale));
+ }
+ long long min_val_long = 0;
+ if(!integerSoA->columns[0].empty()){
+ min_val_long = *std::min_element(integerSoA->columns[0].begin(), integerSoA->columns[0].end());
+ }
+ for (long long& val : integerSoA->columns[0]) {
+ val -= min_val_long;
+ }
+ return std::make_unique<PreprocessorResult>(PreprocessorResult{std::move(integerSoA), {max_decimal_places}, {min_val_long}});
+}
+
+} // namespace KClusterLogic
+
+
+
+namespace storage {
+
+template<typename BufferType>
+class KClusterEncoderData {
+protected:
+ KClusterEncoderData() {
+ page_size_ = common::g_config_value_.page_writer_max_point_num_;
+ points_buffer_.reserve(page_size_);
+ k_ = 100;
+ }
+
+ void reset_data() {
+ points_buffer_.clear();
+ }
+
+ std::vector<BufferType> points_buffer_;
+ uint32_t page_size_;
+ int k_;
+};
+
+template<typename T, typename BufferType>
+class KClusterEncoderBase : public Encoder {
+public:
+ KClusterEncoderBase() {
+ page_size_ = common::g_config_value_.page_writer_max_point_num_;
+ points_buffer_.reserve(page_size_);
+ k_ = 10;
+ }
+ ~KClusterEncoderBase() override {}
+
+ void reset() {
+ points_buffer_.clear();
+ }
+
+ int get_max_byte_size() override {
+ return page_size_ * sizeof(BufferType) * 2;
+ }
+
+ int encode(T value, common::ByteStream &out_stream) {
+ points_buffer_.push_back(static_cast<BufferType>(value));
+ return common::E_OK;
+ }
+
+ int flush(common::ByteStream &out_stream) override;
+
+ int encode(bool, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+ int encode(int32_t, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+ int encode(int64_t, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+ int encode(float, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+ int encode(double, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+ int encode(common::String, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+
+protected:
+ std::vector<BufferType> points_buffer_;
+ uint32_t page_size_;
+ int k_;
+};
+
+// ====================================================================
+// DoubleKClusterEncoder for FLOAT and DOUBLE
+// ====================================================================
+
+class DoubleKClusterEncoder : public Encoder, private KClusterEncoderData<double> {
+public:
+ DoubleKClusterEncoder() = default;
+ ~DoubleKClusterEncoder() override {}
+
+ void reset() override { reset_data(); }
+
+ int get_max_byte_size() override {
+ return page_size_ * sizeof(double) * 2;
+ }
+
+ // --- Overrides for supported types ---
+ int encode(double value, common::ByteStream &out_stream) override {
+ points_buffer_.push_back(value);
+ return common::E_OK;
+ }
+ int encode(float value, common::ByteStream &out_stream) override {
+ points_buffer_.push_back(static_cast<double>(value));
+ return common::E_OK;
+ }
+
+ // --- Override for flush ---
+ int flush(common::ByteStream &out_stream) override;
+
+ // --- Overrides for unsupported types ---
+ int encode(bool, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+ int encode(int32_t, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+ int encode(int64_t, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+ int encode(common::String, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+};
+
+// ====================================================================
+// IntKClusterEncoder for INT32 and INT64
+// ====================================================================
+class IntKClusterEncoder : public Encoder, private KClusterEncoderData<long long> {
+public:
+ IntKClusterEncoder() = default;
+ ~IntKClusterEncoder() override {}
+
+ void reset() override { reset_data(); }
+
+ int get_max_byte_size() override {
+ return page_size_ * sizeof(long long) * 2;
+ }
+
+ // --- Overrides for supported types ---
+ int encode(long long value, common::ByteStream &out_stream) override {
+ points_buffer_.push_back(value);
+ return common::E_OK;
+ }
+ int encode(int32_t value, common::ByteStream &out_stream) override {
+ points_buffer_.push_back(static_cast<long long>(value));
+ return common::E_OK;
+ }
+
+ // --- Override for flush ---
+ int flush(common::ByteStream &out_stream) override;
+
+ // --- Overrides for unsupported types ---
+ int encode(bool, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+ int encode(float, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+ int encode(double, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+ int encode(common::String, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+};
+
+
+template<>
+inline int KClusterEncoderBase<double, double>::flush(common::ByteStream &out_stream) {
+ if (points_buffer_.empty()) {
+ return common::E_OK;
+ }
+ try {
+ int dim = 1;
+ int pack_size = 10;
+ int block_size = 10;
+
+ auto preproc_res_ptr = KClusterLogic::preprocessPageFromDoubles(points_buffer_, dim);
+ if (!preproc_res_ptr || !preproc_res_ptr->data) {
+ reset();
+ return common::E_OK;
+ }
+
+ KClusterLogic::ClusteringResult clustering_res = KClusterLogic::kMedoidLogCost(*preproc_res_ptr->data, k_, 10, 1.0e-3, dim);
+
+ using namespace KClusterLogic;
+ int page_k = clustering_res.medoids.size();
+ BitBuffer fre_bitstream = generateBitstreamFrequency(clustering_res.cluster_sizes, block_size);
+ std::vector<std::vector<long long>> residuals = residualCalculationZigzag_sorted(*preproc_res_ptr->data, clustering_res.medoids, clustering_res.cluster_assignment, clustering_res.cluster_sizes, dim);
+
+ EncodedDataResult bitstream_res = generateBitstreamEncodedData(residuals, pack_size, dim);
+ BasePointsResult basepoint_res = generateBitstreamEncodedBasePointsOptimized(clustering_res.medoids, dim);
+
+ BitBuffer single_page_bitstream;
+ BitBufferUtils::appendToBitstream(single_page_bitstream, (long long)page_k, 16);
+ BitBufferUtils::appendToBitstream(single_page_bitstream, bitstream_res.total_data_points, 16);
+ for (int j = 0; j < dim; ++j) {
+ BitBufferUtils::appendToBitstream(single_page_bitstream, preproc_res_ptr->max_decimal_places[j], 8);
+ long long val = preproc_res_ptr->min_values[j]; long long abs_val = std::abs(val); int bit_len = BitBufferUtils::bitsRequiredNoSign(abs_val);
+ BitBufferUtils::appendToBitstream(single_page_bitstream, bit_len, 8); BitBufferUtils::appendToBitstream(single_page_bitstream, (val >= 0 ? 0 : 1), 1); BitBufferUtils::appendToBitstream(single_page_bitstream, abs_val, bit_len);
+ }
+ for (int j = 0; j < dim; ++j) {
+ long long val = basepoint_res.min_base[j]; long long abs_val = std::abs(val); int bit_len = BitBufferUtils::bitsRequiredNoSign(abs_val);
+ BitBufferUtils::appendToBitstream(single_page_bitstream, bit_len, 8); BitBufferUtils::appendToBitstream(single_page_bitstream, (val >= 0 ? 0 : 1), 1); BitBufferUtils::appendToBitstream(single_page_bitstream, abs_val, bit_len);
+ BitBufferUtils::appendToBitstream(single_page_bitstream, basepoint_res.max_base_bit_len[j], 8);
+ }
+ BitBufferUtils::appendToBitstream(single_page_bitstream, (long long)bitstream_res.pack_res_metadata.size(), 16);
+ for (const auto& pack : bitstream_res.pack_res_metadata) for (int val : pack) BitBufferUtils::appendToBitstream(single_page_bitstream, val, 8);
+ single_page_bitstream.merge(basepoint_res.base_bitstream);
+ single_page_bitstream.merge(fre_bitstream);
+ single_page_bitstream.merge(bitstream_res.residual_bitstream);
+
+ std::vector<uint8_t> compressed_bytes = single_page_bitstream.toByteArray();
+ uint32_t data_size = compressed_bytes.size();
+ out_stream.write_buf(reinterpret_cast<const char*>(&data_size), sizeof(data_size));
+ if (data_size > 0) {
+ out_stream.write_buf(reinterpret_cast<const char*>(compressed_bytes.data()), data_size);
+ }
+ } catch (const std::exception& e) {
+ reset();
+ return common::E_ENCODING_ERROR;
+ }
+ reset();
+ return common::E_OK;
+}
+
+inline int DoubleKClusterEncoder::flush(common::ByteStream &out_stream) {
+ if (points_buffer_.empty()) {
+ return common::E_OK;
+ }
+ try {
+ int dim = 1;
+ int pack_size = 10;
+ int block_size = 10;
+
+ auto preproc_res_ptr = KClusterLogic::preprocessPageFromDoubles(points_buffer_, dim);
+ if (!preproc_res_ptr || !preproc_res_ptr->data) {
+ reset();
+ return common::E_OK;
+ }
+
+ KClusterLogic::ClusteringResult clustering_res = KClusterLogic::kMedoidLogCost(*preproc_res_ptr->data, k_, 10, 1.0e-3, dim);
+
+ using namespace KClusterLogic;
+ int page_k = clustering_res.medoids.size();
+ BitBuffer fre_bitstream = generateBitstreamFrequency(clustering_res.cluster_sizes, block_size);
+ std::vector<std::vector<long long>> residuals = residualCalculationZigzag_sorted(*preproc_res_ptr->data, clustering_res.medoids, clustering_res.cluster_assignment, clustering_res.cluster_sizes, dim);
+
+ EncodedDataResult bitstream_res = generateBitstreamEncodedData(residuals, pack_size, dim);
+ BasePointsResult basepoint_res = generateBitstreamEncodedBasePointsOptimized(clustering_res.medoids, dim);
+
+ BitBuffer single_page_bitstream;
+ BitBufferUtils::appendToBitstream(single_page_bitstream, (long long)page_k, 16);
+ BitBufferUtils::appendToBitstream(single_page_bitstream, bitstream_res.total_data_points, 16);
+ for (int j = 0; j < dim; ++j) {
+ BitBufferUtils::appendToBitstream(single_page_bitstream, preproc_res_ptr->max_decimal_places[j], 8);
+ long long val = preproc_res_ptr->min_values[j]; long long abs_val = std::abs(val); int bit_len = BitBufferUtils::bitsRequiredNoSign(abs_val);
+ BitBufferUtils::appendToBitstream(single_page_bitstream, bit_len, 8); BitBufferUtils::appendToBitstream(single_page_bitstream, (val >= 0 ? 0 : 1), 1); BitBufferUtils::appendToBitstream(single_page_bitstream, abs_val, bit_len);
+ }
+ for (int j = 0; j < dim; ++j) {
+ long long val = basepoint_res.min_base[j]; long long abs_val = std::abs(val); int bit_len = BitBufferUtils::bitsRequiredNoSign(abs_val);
+ BitBufferUtils::appendToBitstream(single_page_bitstream, bit_len, 8); BitBufferUtils::appendToBitstream(single_page_bitstream, (val >= 0 ? 0 : 1), 1); BitBufferUtils::appendToBitstream(single_page_bitstream, abs_val, bit_len);
+ BitBufferUtils::appendToBitstream(single_page_bitstream, basepoint_res.max_base_bit_len[j], 8);
+ }
+ BitBufferUtils::appendToBitstream(single_page_bitstream, (long long)bitstream_res.pack_res_metadata.size(), 16);
+ for (const auto& pack : bitstream_res.pack_res_metadata) for (int val : pack) BitBufferUtils::appendToBitstream(single_page_bitstream, val, 8);
+ single_page_bitstream.merge(basepoint_res.base_bitstream);
+ single_page_bitstream.merge(fre_bitstream);
+ single_page_bitstream.merge(bitstream_res.residual_bitstream);
+
+ std::vector<uint8_t> compressed_bytes = single_page_bitstream.toByteArray();
+ uint32_t data_size = compressed_bytes.size();
+ out_stream.write_buf(reinterpret_cast<const char*>(&data_size), sizeof(data_size));
+ if (data_size > 0) {
+ out_stream.write_buf(reinterpret_cast<const char*>(compressed_bytes.data()), data_size);
+ }
+ } catch (const std::exception& e) {
+ reset();
+ return common::E_ENCODING_ERROR;
+ }
+ reset();
+ return common::E_OK;
+}
+
+inline int IntKClusterEncoder::flush(common::ByteStream &out_stream) {
+ if (points_buffer_.empty()) {
+ return common::E_OK;
+ }
+ try {
+ int dim = 1;
+ int pack_size = 10;
+ int block_size = 10;
+
+ // CORRECTED: Call preprocessPageFromLongs for integer data
+ auto preproc_res_ptr = KClusterLogic::preprocessPageFromLongs(points_buffer_, dim);
+ if (!preproc_res_ptr || !preproc_res_ptr->data) {
+ reset();
+ return common::E_OK;
+ }
+
+ KClusterLogic::ClusteringResult clustering_res = KClusterLogic::kMedoidLogCost(*preproc_res_ptr->data, k_, 10, 1.0e-3, dim);
+
+ using namespace KClusterLogic;
+ int page_k = clustering_res.medoids.size();
+ BitBuffer fre_bitstream = generateBitstreamFrequency(clustering_res.cluster_sizes, block_size);
+ std::vector<std::vector<long long>> residuals = residualCalculationZigzag_sorted(*preproc_res_ptr->data, clustering_res.medoids, clustering_res.cluster_assignment, clustering_res.cluster_sizes, dim);
+
+ EncodedDataResult bitstream_res = generateBitstreamEncodedData(residuals, pack_size, dim);
+ BasePointsResult basepoint_res = generateBitstreamEncodedBasePointsOptimized(clustering_res.medoids, dim);
+
+ BitBuffer single_page_bitstream;
+ BitBufferUtils::appendToBitstream(single_page_bitstream, (long long)page_k, 16);
+ BitBufferUtils::appendToBitstream(single_page_bitstream, bitstream_res.total_data_points, 16);
+ for (int j = 0; j < dim; ++j) {
+ BitBufferUtils::appendToBitstream(single_page_bitstream, preproc_res_ptr->max_decimal_places[j], 8);
+ long long val = preproc_res_ptr->min_values[j]; long long abs_val = std::abs(val); int bit_len = BitBufferUtils::bitsRequiredNoSign(abs_val);
+ BitBufferUtils::appendToBitstream(single_page_bitstream, bit_len, 8); BitBufferUtils::appendToBitstream(single_page_bitstream, (val >= 0 ? 0 : 1), 1); BitBufferUtils::appendToBitstream(single_page_bitstream, abs_val, bit_len);
+ }
+ for (int j = 0; j < dim; ++j) {
+ long long val = basepoint_res.min_base[j]; long long abs_val = std::abs(val); int bit_len = BitBufferUtils::bitsRequiredNoSign(abs_val);
+ BitBufferUtils::appendToBitstream(single_page_bitstream, bit_len, 8); BitBufferUtils::appendToBitstream(single_page_bitstream, (val >= 0 ? 0 : 1), 1); BitBufferUtils::appendToBitstream(single_page_bitstream, abs_val, bit_len);
+ BitBufferUtils::appendToBitstream(single_page_bitstream, basepoint_res.max_base_bit_len[j], 8);
+ }
+ BitBufferUtils::appendToBitstream(single_page_bitstream, (long long)bitstream_res.pack_res_metadata.size(), 16);
+ for (const auto& pack : bitstream_res.pack_res_metadata) for (int val : pack) BitBufferUtils::appendToBitstream(single_page_bitstream, val, 8);
+ single_page_bitstream.merge(basepoint_res.base_bitstream);
+ single_page_bitstream.merge(fre_bitstream);
+ single_page_bitstream.merge(bitstream_res.residual_bitstream);
+
+ std::vector<uint8_t> compressed_bytes = single_page_bitstream.toByteArray();
+ uint32_t data_size = compressed_bytes.size();
+ out_stream.write_buf(reinterpret_cast<const char*>(&data_size), sizeof(data_size));
+ if (data_size > 0) {
+ out_stream.write_buf(reinterpret_cast<const char*>(compressed_bytes.data()), data_size);
+ }
+ } catch (const std::exception& e) {
+ reset();
+ return common::E_ENCODING_ERROR;
+ }
+ reset();
+ return common::E_OK;
+}
+}
+
+#endif // ENCODING_KCLUSTER_ENCODER_H
\ No newline at end of file
diff --git a/cpp/test/encoding/cluster_codec_test.cc b/cpp/test/encoding/cluster_codec_test.cc
new file mode 100644
index 0000000..7126bc2
--- /dev/null
+++ b/cpp/test/encoding/cluster_codec_test.cc
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License a
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <vector>
+#include <cmath>
+#include <cstdlib>
+
+// Include your new encoder and decoder header files
+#include "encoding/kcluster_encoder.h"
+#include "encoding/kcluster_decoder.h"
+#include "encoding/acluster_encoder.h"
+#include "encoding/acluster_decoder.h"
+
+// Note: The original test uses raw new/delete. The framework might be using
+// a custom memory allocator via the factory. For a standalone test, raw new/delete is fine.
+// If integrated into the project's test runner, using the factory would be better.
+// We will follow the style of ts2diff_codec_test.cc.
+
+namespace storage {
+
+class ClusterCodecTest : public ::testing::Test {
+ protected:
+ void SetUp() override {
+ // KCluster instances
+ kcluster_encoder_double_ = new DoubleKClusterEncoder();
+ kcluster_decoder_double_ = new DoubleKClusterDecoder();
+ kcluster_encoder_int_ = new IntKClusterEncoder();
+ kcluster_decoder_int_ = new IntKClusterDecoder();
+
+ // ACluster instances
+ acluster_encoder_double_ = new DoubleAClusterEncoder();
+ acluster_decoder_double_ = new DoubleAClusterDecoder();
+ acluster_encoder_int_ = new IntAClusterEncoder();
+ acluster_decoder_int_ = new IntAClusterDecoder();
+ }
+
+ void TearDown() override {
+ // KCluster
+ delete kcluster_encoder_double_;
+ delete kcluster_decoder_double_;
+ delete kcluster_encoder_int_;
+ delete kcluster_decoder_int_;
+
+ // ACluster
+ delete acluster_encoder_double_;
+ delete acluster_decoder_double_;
+ delete acluster_encoder_int_;
+ delete acluster_decoder_int_;
+ }
+
+ // KCluster pointers
+ DoubleKClusterEncoder* kcluster_encoder_double_;
+ DoubleKClusterDecoder* kcluster_decoder_double_;
+ IntKClusterEncoder* kcluster_encoder_int_;
+ IntKClusterDecoder* kcluster_decoder_int_;
+
+ // ACluster pointers
+ DoubleAClusterEncoder* acluster_encoder_double_;
+ DoubleAClusterDecoder* acluster_decoder_double_;
+ IntAClusterEncoder* acluster_encoder_int_;
+ IntAClusterDecoder* acluster_decoder_int_;
+};
+
+
+// ===================================================================
+// KCluster Tests
+// ===================================================================
+
+TEST_F(ClusterCodecTest, KClusterDoubleEncoding) {
+ common::ByteStream out_stream(1024, common::MOD_DEFAULT, false);
+ const int row_num = 1000; // Your algorithm is page-based, so let's test with a number of points that fits in a page.
+ std::vector<double> data(row_num);
+
+ // Generate some test data, e.g., a sine wave with some noise
+ for (int i = 0; i < row_num; i++) {
+ data[i] = 100.0 * std::sin(i * 0.1) + (static_cast<double>(rand()) / RAND_MAX - 0.5);
+ }
+
+ for (int i = 0; i < row_num; i++) {
+ EXPECT_EQ(kcluster_encoder_double_->encode(data[i], out_stream), common::E_OK);
+ }
+ EXPECT_EQ(kcluster_encoder_double_->flush(out_stream), common::E_OK);
+
+ double x;
+ for (int i = 0; i < row_num; i++) {
+ EXPECT_EQ(kcluster_decoder_double_->read_double(x, out_stream), common::E_OK);
+ // Due to scaling and floating point precision, use ASSERT_NEAR for comparison
+ ASSERT_NEAR(x, data[i], 1e-9);
+ }
+}
+
+TEST_F(ClusterCodecTest, KClusterIntEncoding) {
+ common::ByteStream out_stream(1024, common::MOD_DEFAULT, false);
+ const int row_num = 1000;
+ std::vector<int64_t> data(row_num);
+
+ // Generate some integer data
+ for (int i = 0; i < row_num; i++) {
+ data[i] = static_cast<int64_t>(i * i) - (i * 10);
+ }
+
+ for (int i = 0; i < row_num; i++) {
+ EXPECT_EQ(kcluster_encoder_int_->encode(data[i], out_stream), common::E_OK);
+ }
+ EXPECT_EQ(kcluster_encoder_int_->flush(out_stream), common::E_OK);
+
+ int64_t x;
+ for (int i = 0; i < row_num; i++) {
+ EXPECT_EQ(kcluster_decoder_int_->read_int64(x, out_stream), common::E_OK);
+ EXPECT_EQ(x, data[i]);
+ }
+}
+
+
+// ===================================================================
+// ACluster Tests
+// ===================================================================
+
+TEST_F(ClusterCodecTest, AClusterDoubleEncoding) {
+ common::ByteStream out_stream(1024, common::MOD_DEFAULT, false);
+ const int row_num = 1000;
+ std::vector<double> data(row_num);
+
+ // Generate some test data, e.g., a linear slope with some periodic component
+ for (int i = 0; i < row_num; i++) {
+ data[i] = 0.5 * i + 50.0 * std::cos(i * 0.2);
+ }
+
+ for (int i = 0; i < row_num; i++) {
+ EXPECT_EQ(acluster_encoder_double_->encode(data[i], out_stream), common::E_OK);
+ }
+ EXPECT_EQ(acluster_encoder_double_->flush(out_stream), common::E_OK);
+
+ double x;
+ for (int i = 0; i < row_num; i++) {
+ EXPECT_EQ(acluster_decoder_double_->read_double(x, out_stream), common::E_OK);
+ ASSERT_NEAR(x, data[i], 1e-9);
+ }
+}
+
+TEST_F(ClusterCodecTest, AClusterIntEncoding) {
+ common::ByteStream out_stream(1024, common::MOD_DEFAULT, false);
+ const int row_num = 1000;
+ std::vector<int64_t> data(row_num);
+
+ // Generate some different integer data
+ for (int i = 0; i < row_num; i++) {
+ data[i] = 10000 + (i % 50) * 100; // Data that should cluster well
+ }
+
+ for (int i = 0; i < row_num; i++) {
+ EXPECT_EQ(acluster_encoder_int_->encode(data[i], out_stream), common::E_OK);
+ }
+ EXPECT_EQ(acluster_encoder_int_->flush(out_stream), common::E_OK);
+
+ int64_t x;
+ for (int i = 0; i < row_num; i++) {
+ EXPECT_EQ(acluster_decoder_int_->read_int64(x, out_stream), common::E_OK);
+ EXPECT_EQ(x, data[i]);
+ }
+}
+
+TEST_F(ClusterCodecTest, KClusterFloatAsDoubleEncoding) {
+ common::ByteStream out_stream(1024, common::MOD_DEFAULT, false);
+ const int row_num = 500;
+ std::vector<float> data(row_num);
+
+ for (int i = 0; i < row_num; i++) {
+ data[i] = 123.45f + static_cast<float>(i);
+ }
+
+ for (int i = 0; i < row_num; i++) {
+ EXPECT_EQ(kcluster_encoder_double_->encode(data[i], out_stream), common::E_OK);
+ }
+ EXPECT_EQ(kcluster_encoder_double_->flush(out_stream), common::E_OK);
+
+ float x;
+ for (int i = 0; i < row_num; i++) {
+ EXPECT_EQ(kcluster_decoder_double_->read_float(x, out_stream), common::E_OK);
+ ASSERT_NEAR(x, data[i], 1e-6);
+ }
+}
+
+TEST_F(ClusterCodecTest, AClusterInt32AsInt64Encoding) {
+ common::ByteStream out_stream(1024, common::MOD_DEFAULT, false);
+ const int row_num = 500;
+ std::vector<int32_t> data(row_num);
+
+ for (int i = 0; i < row_num; i++) {
+ data[i] = 500 - i;
+ }
+
+ for (int i = 0; i < row_num; i++) {
+ EXPECT_EQ(acluster_encoder_int_->encode(data[i], out_stream), common::E_OK);
+ }
+ EXPECT_EQ(acluster_encoder_int_->flush(out_stream), common::E_OK);
+
+ int32_t x;
+ for (int i = 0; i < row_num; i++) {
+ EXPECT_EQ(acluster_decoder_int_->read_int32(x, out_stream), common::E_OK);
+ EXPECT_EQ(x, data[i]);
+ }
+}
+
+} // namespace storage
\ No newline at end of file
diff --git a/java/pom.xml b/java/pom.xml
index b05a920..8c6be19 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -56,7 +56,7 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
- <version>3.16.0</version>
+ <version>3.15.0</version>
</dependency>
<dependency>
<groupId>org.lz4</groupId>
@@ -99,7 +99,7 @@
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
- <version>2.11.0</version>
+ <version>2.10.1</version>
</dependency>
</dependencies>
</dependencyManagement>
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/ClusterDecoder.java b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/ClusterDecoder.java
index 7047984..59a914f 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/ClusterDecoder.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/ClusterDecoder.java
@@ -1,7 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.tsfile.encoding.decoder;
import org.apache.tsfile.enums.TSDataType;
-import org.apache.tsfile.exception.encoding.TsFileDecodingException;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import java.io.IOException;
@@ -9,207 +27,199 @@
public class ClusterDecoder extends Decoder {
- private final TSDataType dataType;
+ private final TSDataType dataType;
+ private long[] longValues;
+ private double[] doubleValues;
+ private int readIndex = 0;
+ private int count = 0;
+ private boolean hasDecoded = false;
- private long[] longValues;
- private double[] doubleValues;
- private int readIndex = 0;
- private int count = 0;
- private boolean hasDecoded = false;
+ public ClusterDecoder(TSDataType dataType) {
+ super(TSEncoding.ACLUSTER);
+ this.dataType = dataType;
+ }
- public ClusterDecoder(TSDataType dataType) {
- super(TSEncoding.ACLUSTER);
- this.dataType = dataType;
+ /** check has next */
+ @Override
+ public boolean hasNext(ByteBuffer buffer) throws IOException {
+ if (!hasDecoded) {
+ decodeInternally(buffer);
+ }
+ return readIndex < count;
+ }
+
+ /** reset */
+ @Override
+ public void reset() {
+ this.hasDecoded = false;
+ this.readIndex = 0;
+ this.count = 0;
+ this.longValues = null;
+ this.doubleValues = null;
+ }
+
+ @Override
+ public int readInt(ByteBuffer buffer) {
+ return (int) longValues[readIndex++];
+ }
+
+ @Override
+ public long readLong(ByteBuffer buffer) {
+ return longValues[readIndex++];
+ }
+
+ @Override
+ public float readFloat(ByteBuffer buffer) {
+ return (float) doubleValues[readIndex++];
+ }
+
+ @Override
+ public double readDouble(ByteBuffer buffer) {
+ return doubleValues[readIndex++];
+ }
+
+ /** Internal decode method Decode ACluster */
+ private void decodeInternally(ByteBuffer buffer) {
+ if (hasDecoded) {
+ return;
}
- /**
- * check has next
- */
- @Override
- public boolean hasNext(ByteBuffer buffer) throws IOException {
- if (!hasDecoded) {
- decodeInternally(buffer);
- }
- return readIndex < count;
+ ClusterReader reader = new ClusterReader(buffer);
+
+ // --- Header ---
+ int scalingExponent = (int) reader.read(8);
+ int k = (int) reader.read(16);
+ this.count = (int) reader.read(16);
+ int packSize = (int) reader.read(16);
+
+ // --- Global Minimum Value (minVal) ---
+ int minValBit = (int) reader.read(8);
+ long minValSign = reader.read(1);
+ long absMinVal = reader.read(minValBit); // Read the absolute value part
+ long minVal = (minValSign == 1) ? -absMinVal : absMinVal; // Apply the sign
+
+ // Allocate memory based on count and dataType
+ if (this.count > 0) {
+ if (dataType == TSDataType.FLOAT || dataType == TSDataType.DOUBLE) {
+ this.doubleValues = new double[this.count];
+ } else {
+ this.longValues = new long[this.count];
+ }
}
- /**
- * reset
- */
- @Override
- public void reset() {
- this.hasDecoded = false;
- this.readIndex = 0;
- this.count = 0;
- this.longValues = null;
- this.doubleValues = null;
+ // Handle case where all values are the same (k=0)
+ if (k == 0) {
+ if (this.count > 0) {
+ reconstructFromMinValOnly(minVal, scalingExponent);
+ }
+ this.hasDecoded = true;
+ return;
}
- @Override
- public int readInt(ByteBuffer buffer){
- return (int) longValues[readIndex++];
+ // --- Medoids ---
+ long[] medoids = new long[k];
+ int minMedoidBit = (int) reader.read(8);
+ long minMedoidSign = reader.read(1);
+ long absMinMedoid = reader.read(minMedoidBit); // Read the absolute value part
+ long minMedoid = (minMedoidSign == 1) ? -absMinMedoid : absMinMedoid; // Apply the sign
+
+ int maxMedoidOffsetBits = (int) reader.read(8);
+ for (int i = 0; i < k; i++) {
+ long offset = reader.read(maxMedoidOffsetBits);
+ medoids[i] = minMedoid + offset;
}
- @Override
- public long readLong(ByteBuffer buffer){
- return longValues[readIndex++];
+ // --- Frequencies (Cluster Sizes) ---
+ // The encoder wrote deltas (cluster sizes), so we read them and rebuild the cumulative array
+ long[] cumulativeFrequencies = new long[k];
+ int numFreqBlocks = (int) reader.read(16);
+
+ // Metadata pass for frequencies
+ int[] freqBlockMaxBits = new int[numFreqBlocks];
+ for (int i = 0; i < numFreqBlocks; i++) {
+ freqBlockMaxBits[i] = (int) reader.read(8);
}
- @Override
- public float readFloat(ByteBuffer buffer){
- return (float) doubleValues[readIndex++];
+ // Data pass for frequencies - Reconstruct cumulative frequencies
+ long currentCumulativeFreq = 0;
+ int freqIndex = 0;
+ for (int i = 0; i < numFreqBlocks; i++) {
+ int start = i * packSize;
+ int end = Math.min(start + packSize, k);
+ int bitsForBlock = freqBlockMaxBits[i];
+ for (int j = start; j < end; j++) {
+ long delta = reader.read(bitsForBlock); // This delta is the actual cluster size
+ currentCumulativeFreq += delta;
+ cumulativeFrequencies[freqIndex++] = currentCumulativeFreq;
+ }
}
- @Override
- public double readDouble(ByteBuffer buffer){
- return doubleValues[readIndex++];
+ // --- Residuals ---
+ long[] residuals = new long[this.count];
+ int numPacks = (int) reader.read(32);
+
+ // Metadata pass for residuals
+ int[] resPackMaxBits = new int[numPacks];
+ for (int i = 0; i < numPacks; i++) {
+ resPackMaxBits[i] = (int) reader.read(8);
}
-
- /**
- * Internal decode method
- * Decode ACluster
- */
- private void decodeInternally(ByteBuffer buffer) {
- if (hasDecoded) {
- return;
+ // Data pass for residuals
+ int residualIdx = 0;
+ for (int i = 0; i < numPacks; i++) {
+ int start = i * packSize;
+ int end = Math.min(start + packSize, this.count);
+ int bitsForPack = resPackMaxBits[i];
+ if (bitsForPack > 0) {
+ for (int j = start; j < end; j++) {
+ residuals[residualIdx++] = reader.read(bitsForPack);
}
-
- ClusterReader reader = new ClusterReader(buffer);
-
- // --- Header ---
- int scalingExponent = (int) reader.read(8);
- int k = (int) reader.read(16);
- this.count = (int) reader.read(16);
- int packSize = (int) reader.read(16);
-
- // --- Global Minimum Value (minVal) ---
- int minValBit = (int) reader.read(8);
- long minValSign = reader.read(1);
- long absMinVal = reader.read(minValBit); // Read the absolute value part
- long minVal = (minValSign == 1) ? -absMinVal : absMinVal; // Apply the sign
-
- // Allocate memory based on count and dataType
- if (this.count > 0) {
- if (dataType == TSDataType.FLOAT || dataType == TSDataType.DOUBLE) {
- this.doubleValues = new double[this.count];
- } else {
- this.longValues = new long[this.count];
- }
- }
-
- // Handle case where all values are the same (k=0)
- if (k == 0) {
- if (this.count > 0) {
- reconstructFromMinValOnly(minVal, scalingExponent);
- }
- this.hasDecoded = true;
- return;
- }
-
- // --- Medoids ---
- long[] medoids = new long[k];
- int minMedoidBit = (int) reader.read(8);
- long minMedoidSign = reader.read(1);
- long absMinMedoid = reader.read(minMedoidBit); // Read the absolute value part
- long minMedoid = (minMedoidSign == 1) ? -absMinMedoid : absMinMedoid; // Apply the sign
-
- int maxMedoidOffsetBits = (int) reader.read(8);
- for (int i = 0; i < k; i++) {
- long offset = reader.read(maxMedoidOffsetBits);
- medoids[i] = minMedoid + offset;
- }
-
- // --- Frequencies (Cluster Sizes) ---
- // The encoder wrote deltas (cluster sizes), so we read them and rebuild the cumulative array
- long[] cumulativeFrequencies = new long[k];
- int numFreqBlocks = (int) reader.read(16);
-
- // Metadata pass for frequencies
- int[] freqBlockMaxBits = new int[numFreqBlocks];
- for (int i = 0; i < numFreqBlocks; i++) {
- freqBlockMaxBits[i] = (int) reader.read(8);
- }
-
- // Data pass for frequencies - Reconstruct cumulative frequencies
- long currentCumulativeFreq = 0;
- int freqIndex = 0;
- for (int i = 0; i < numFreqBlocks; i++) {
- int start = i * packSize;
- int end = Math.min(start + packSize, k);
- int bitsForBlock = freqBlockMaxBits[i];
- for (int j = start; j < end; j++) {
- long delta = reader.read(bitsForBlock); // This delta is the actual cluster size
- currentCumulativeFreq += delta;
- cumulativeFrequencies[freqIndex++] = currentCumulativeFreq;
- }
- }
-
- // --- Residuals ---
- long[] residuals = new long[this.count];
- int numPacks = (int) reader.read(32);
-
- // Metadata pass for residuals
- int[] resPackMaxBits = new int[numPacks];
- for (int i = 0; i < numPacks; i++) {
- resPackMaxBits[i] = (int) reader.read(8);
- }
-
- // Data pass for residuals
- int residualIdx = 0;
- for (int i = 0; i < numPacks; i++) {
- int start = i * packSize;
- int end = Math.min(start + packSize, this.count);
- int bitsForPack = resPackMaxBits[i];
- if (bitsForPack > 0) {
- for (int j = start; j < end; j++) {
- residuals[residualIdx++] = reader.read(bitsForPack);
- }
- } else {
- // If bitsForPack is 0, all residuals in this pack are 0.
- // We just need to advance the index, as the array is already initialized to 0.
- residualIdx += (end - start);
- }
- }
-
- // --- Final Data Reconstruction ---
- // Use the correctly reconstructed cumulativeFrequencies array
- reconstructData(medoids, cumulativeFrequencies, residuals, minVal, scalingExponent);
-
- this.hasDecoded = true;
+ } else {
+ // If bitsForPack is 0, all residuals in this pack are 0.
+ // We just need to advance the index, as the array is already initialized to 0.
+ residualIdx += (end - start);
+ }
}
- private void reconstructData(long[] medoids, long[] frequencies, long[] residuals, long minVal, int scalingExponent) {
- int residualReadPos = 0;
- int dataWritePos = 0;
- double scalingFactor = Math.pow(10, scalingExponent);
+ // --- Final Data Reconstruction ---
+ // Use the correctly reconstructed cumulativeFrequencies array
+ reconstructData(medoids, cumulativeFrequencies, residuals, minVal, scalingExponent);
- for (int clusterId = 0; clusterId < medoids.length; clusterId++) {
- long pointsInThisCluster = frequencies[clusterId];
- long medoid = medoids[clusterId];
+ this.hasDecoded = true;
+ }
- for (int i = 0; i < pointsInThisCluster; i++) {
- long zigzagResidual = residuals[residualReadPos++];
- long residual = (zigzagResidual >>> 1) ^ -(zigzagResidual & 1);
- long scaledDataPoint = medoid + residual + minVal;
+ private void reconstructData(
+ long[] medoids, long[] frequencies, long[] residuals, long minVal, int scalingExponent) {
+ int residualReadPos = 0;
+ int dataWritePos = 0;
+ double scalingFactor = Math.pow(10, scalingExponent);
- if (dataType == TSDataType.FLOAT || dataType == TSDataType.DOUBLE) {
- doubleValues[dataWritePos++] = scaledDataPoint / scalingFactor;
- } else {
- longValues[dataWritePos++] = scaledDataPoint;
- }
- }
- }
- }
+ for (int clusterId = 0; clusterId < medoids.length; clusterId++) {
+ long pointsInThisCluster = frequencies[clusterId];
+ long medoid = medoids[clusterId];
- private void reconstructFromMinValOnly(long minVal, int scalingExponent) {
+ for (int i = 0; i < pointsInThisCluster; i++) {
+ long zigzagResidual = residuals[residualReadPos++];
+ long residual = (zigzagResidual >>> 1) ^ -(zigzagResidual & 1);
+ long scaledDataPoint = medoid + residual + minVal;
+
if (dataType == TSDataType.FLOAT || dataType == TSDataType.DOUBLE) {
- double scalingFactor = Math.pow(10, scalingExponent);
- double finalValue = minVal / scalingFactor;
- for(int i=0; i<this.count; i++) doubleValues[i] = finalValue;
+ doubleValues[dataWritePos++] = scaledDataPoint / scalingFactor;
} else {
- for(int i=0; i<this.count; i++) longValues[i] = minVal;
+ longValues[dataWritePos++] = scaledDataPoint;
}
+ }
}
+ }
+
+ private void reconstructFromMinValOnly(long minVal, int scalingExponent) {
+ if (dataType == TSDataType.FLOAT || dataType == TSDataType.DOUBLE) {
+ double scalingFactor = Math.pow(10, scalingExponent);
+ double finalValue = minVal / scalingFactor;
+ for (int i = 0; i < this.count; i++) doubleValues[i] = finalValue;
+ } else {
+ for (int i = 0; i < this.count; i++) longValues[i] = minVal;
+ }
+ }
}
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/ClusterReader.java b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/ClusterReader.java
index ad578ad..99ebb66 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/ClusterReader.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/ClusterReader.java
@@ -1,35 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.tsfile.encoding.decoder;
import java.nio.ByteBuffer;
public class ClusterReader {
- private final ByteBuffer buffer;
- private byte currentByte;
- private int bitPosition; // from 7 down to 0
+ private final ByteBuffer buffer;
+ private byte currentByte;
+ private int bitPosition; // from 7 down to 0
- public ClusterReader(ByteBuffer buffer) {
- this.buffer = buffer;
- this.currentByte = 0;
- this.bitPosition = -1; // Start at -1 to force reading a new byte first
+ public ClusterReader(ByteBuffer buffer) {
+ this.buffer = buffer;
+ this.currentByte = 0;
+ this.bitPosition = -1; // Start at -1 to force reading a new byte first
+ }
+
+ public long read(int numBits) {
+ if (numBits > 64 || numBits <= 0) {
+ throw new IllegalArgumentException(
+ "Cannot read more than 64 bits or non-positive bits at once.");
}
- public long read(int numBits) {
- if (numBits > 64 || numBits <= 0) {
- throw new IllegalArgumentException("Cannot read more than 64 bits or non-positive bits at once.");
- }
-
- long result = 0;
- for (int i = 0; i < numBits; i++) {
- if (bitPosition < 0) {
- currentByte = buffer.get();
- bitPosition = 7;
- }
- // Read the bit at the current position
- long bit = (currentByte >> bitPosition) & 1;
- // Shift the result and add the new bit
- result = (result << 1) | bit;
- bitPosition--;
- }
- return result;
+ long result = 0;
+ for (int i = 0; i < numBits; i++) {
+ if (bitPosition < 0) {
+ currentByte = buffer.get();
+ bitPosition = 7;
+ }
+ // Read the bit at the current position
+ long bit = (currentByte >> bitPosition) & 1;
+ // Shift the result and add the new bit
+ result = (result << 1) | bit;
+ bitPosition--;
}
-}
\ No newline at end of file
+ return result;
+ }
+}
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/Decoder.java b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/Decoder.java
index bc61bda..9f3a724 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/Decoder.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/Decoder.java
@@ -177,14 +177,18 @@
default:
throw new TsFileDecodingException(String.format(ERROR_MSG, encoding, dataType));
}
- case ACLUSTER: case KCLUSTER:
- switch (dataType){
- case INT32: case INT64: case FLOAT: case DOUBLE:
+ case ACLUSTER:
+ case KCLUSTER:
+ switch (dataType) {
+ case INT32:
+ case INT64:
+ case FLOAT:
+ case DOUBLE:
return new ClusterDecoder(dataType);
default:
throw new TsFileDecodingException(String.format(ERROR_MSG, encoding, dataType));
}
- default:
+ default:
throw new TsFileDecodingException(String.format(ERROR_MSG, encoding, dataType));
}
}
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/AClusterAlgorithm.java b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/AClusterAlgorithm.java
index 44d4d91..f4e079f 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/AClusterAlgorithm.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/AClusterAlgorithm.java
@@ -24,190 +24,185 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
public final class AClusterAlgorithm {
- /**
- * Private constructor to prevent instantiation of this utility class.
- */
- private AClusterAlgorithm() {}
+ /** Private constructor to prevent instantiation of this utility class. */
+ private AClusterAlgorithm() {}
- /**
- * The main entry point for the ACluster algorithm.
- * It processes a page of data and returns the results as an Object array.
- *
- * @param data The input time series data for a single page, represented as a long array.
- * @return An Object array where: <br>
- * - index 0: long[] medoids (sorted by cluster frequency) <br>
- * - index 1: int[] clusterAssignments (mapped to the sorted medoids) <br>
- * - index 2: long[] clusterFrequencies (sorted)
- */
- public static Object[] run(long[] data) {
- int n = data.length;
- if (n == 0) {
- return new Object[]{new long[0], new int[0], new long[0]};
- }
-
- // --- Initialization ---
- List<Long> medoids = new ArrayList<>();
- Set<Long> existingMedoids = new HashSet<>();
- List<Set<Integer>> pointsInClusters = new ArrayList<>();
- int[] pointToMedoidMap = new int[n];
-
- // --- Step 1: Initialize with the first data point ---
- long firstPoint = data[0];
- medoids.add(firstPoint);
- existingMedoids.add(firstPoint);
- Set<Integer> firstCluster = new HashSet<>();
- firstCluster.add(0);
- pointsInClusters.add(firstCluster);
- pointToMedoidMap[0] = 0;
-
- // --- Step 2: Iteratively process the rest of the points ---
- for (int i = 1; i < n; i++) {
- long currentPoint = data[i];
-
- if (existingMedoids.contains(currentPoint)) {
- int medoidIndex = medoids.indexOf(currentPoint);
- pointsInClusters.get(medoidIndex).add(i);
- pointToMedoidMap[i] = medoidIndex;
- continue;
- }
-
- // --- Step 3: Find the best existing medoid ---
- int bestMedoidIndex = -1;
- long minCostToExistingMedoid = Long.MAX_VALUE;
- for (int j = 0; j < medoids.size(); j++) {
- long cost = calculateResidualCost(currentPoint, medoids.get(j));
- if (cost < minCostToExistingMedoid) {
- minCostToExistingMedoid = cost;
- bestMedoidIndex = j;
- }
- }
-
- // --- Step 4: Calculate potential savings ---
- long savingsFromCurrentPoint = minCostToExistingMedoid;
- long savingsFromReassignment = 0;
- Set<Integer> pointsInBestCluster = pointsInClusters.get(bestMedoidIndex);
- for (int pointIndexInCluster : pointsInBestCluster) {
- long p = data[pointIndexInCluster];
- long costToOldMedoid = calculateResidualCost(p, medoids.get(bestMedoidIndex));
- long costToNewPotentialMedoid = calculateResidualCost(p, currentPoint);
- if (costToNewPotentialMedoid < costToOldMedoid) {
- savingsFromReassignment += (costToOldMedoid - costToNewPotentialMedoid);
- }
- }
- long totalSavings = savingsFromCurrentPoint + savingsFromReassignment;
-
- // --- Step 5: Make the decision ---
- long storageCostForNewPoint = calculateBasePointStorageCost(currentPoint);
- if (totalSavings > storageCostForNewPoint) {
- // Decision: Create a new medoid.
- int newMedoidId = medoids.size();
- medoids.add(currentPoint);
- existingMedoids.add(currentPoint);
- Set<Integer> newCluster = new HashSet<>();
- newCluster.add(i);
- pointsInClusters.add(newCluster);
- pointToMedoidMap[i] = newMedoidId;
-
- Set<Integer> pointsToReEvaluate = new HashSet<>(pointsInClusters.get(bestMedoidIndex));
- for (int pointIndexToReEvaluate : pointsToReEvaluate) {
- long p = data[pointIndexToReEvaluate];
- if (calculateResidualCost(p, currentPoint) < calculateResidualCost(p, medoids.get(bestMedoidIndex))) {
- pointsInClusters.get(bestMedoidIndex).remove(pointIndexToReEvaluate);
- pointsInClusters.get(newMedoidId).add(pointIndexToReEvaluate);
- pointToMedoidMap[pointIndexToReEvaluate] = newMedoidId;
- }
- }
- } else {
- // Decision: Assign to the existing best medoid.
- pointsInClusters.get(bestMedoidIndex).add(i);
- pointToMedoidMap[i] = bestMedoidIndex;
- }
- }
-
- // --- Step 6: Finalize and sort the results ---
- int k = medoids.size();
- long[] finalMedoids = medoids.stream().mapToLong(l -> l).toArray();
- long[] rawClusterSizes = new long[k];
- for (int i = 0; i < k; i++) {
- rawClusterSizes[i] = pointsInClusters.get(i).size();
- }
-
- return sortResults(finalMedoids, pointToMedoidMap, rawClusterSizes);
+ /**
+ * The main entry point for the ACluster algorithm. It processes a page of data and returns the
+ * results as an Object array.
+ *
+ * @param data The input time series data for a single page, represented as a long array.
+ * @return An Object array where: <br>
+ * - index 0: long[] medoids (sorted by cluster frequency) <br>
+ * - index 1: int[] clusterAssignments (mapped to the sorted medoids) <br>
+ * - index 2: long[] clusterFrequencies (sorted)
+ */
+ public static Object[] run(long[] data) {
+ int n = data.length;
+ if (n == 0) {
+ return new Object[] {new long[0], new int[0], new long[0]};
}
- /**
- * Helper class for sorting medoids based on their cluster size (frequency).
- */
- private static class MedoidSortHelper implements Comparable<MedoidSortHelper> {
- long medoid;
- long size;
- int originalIndex;
+ // --- Initialization ---
+ List<Long> medoids = new ArrayList<>();
+ Set<Long> existingMedoids = new HashSet<>();
+ List<Set<Integer>> pointsInClusters = new ArrayList<>();
+ int[] pointToMedoidMap = new int[n];
- MedoidSortHelper(long medoid, long size, int originalIndex) {
- this.medoid = medoid;
- this.size = size;
- this.originalIndex = originalIndex;
- }
+ // --- Step 1: Initialize with the first data point ---
+ long firstPoint = data[0];
+ medoids.add(firstPoint);
+ existingMedoids.add(firstPoint);
+ Set<Integer> firstCluster = new HashSet<>();
+ firstCluster.add(0);
+ pointsInClusters.add(firstCluster);
+ pointToMedoidMap[0] = 0;
- @Override
- public int compareTo(MedoidSortHelper other) {
- return Long.compare(this.size, other.size);
+ // --- Step 2: Iteratively process the rest of the points ---
+ for (int i = 1; i < n; i++) {
+ long currentPoint = data[i];
+
+ if (existingMedoids.contains(currentPoint)) {
+ int medoidIndex = medoids.indexOf(currentPoint);
+ pointsInClusters.get(medoidIndex).add(i);
+ pointToMedoidMap[i] = medoidIndex;
+ continue;
+ }
+
+ // --- Step 3: Find the best existing medoid ---
+ int bestMedoidIndex = -1;
+ long minCostToExistingMedoid = Long.MAX_VALUE;
+ for (int j = 0; j < medoids.size(); j++) {
+ long cost = calculateResidualCost(currentPoint, medoids.get(j));
+ if (cost < minCostToExistingMedoid) {
+ minCostToExistingMedoid = cost;
+ bestMedoidIndex = j;
}
+ }
+
+ // --- Step 4: Calculate potential savings ---
+ long savingsFromCurrentPoint = minCostToExistingMedoid;
+ long savingsFromReassignment = 0;
+ Set<Integer> pointsInBestCluster = pointsInClusters.get(bestMedoidIndex);
+ for (int pointIndexInCluster : pointsInBestCluster) {
+ long p = data[pointIndexInCluster];
+ long costToOldMedoid = calculateResidualCost(p, medoids.get(bestMedoidIndex));
+ long costToNewPotentialMedoid = calculateResidualCost(p, currentPoint);
+ if (costToNewPotentialMedoid < costToOldMedoid) {
+ savingsFromReassignment += (costToOldMedoid - costToNewPotentialMedoid);
+ }
+ }
+ long totalSavings = savingsFromCurrentPoint + savingsFromReassignment;
+
+ // --- Step 5: Make the decision ---
+ long storageCostForNewPoint = calculateBasePointStorageCost(currentPoint);
+ if (totalSavings > storageCostForNewPoint) {
+ // Decision: Create a new medoid.
+ int newMedoidId = medoids.size();
+ medoids.add(currentPoint);
+ existingMedoids.add(currentPoint);
+ Set<Integer> newCluster = new HashSet<>();
+ newCluster.add(i);
+ pointsInClusters.add(newCluster);
+ pointToMedoidMap[i] = newMedoidId;
+
+ Set<Integer> pointsToReEvaluate = new HashSet<>(pointsInClusters.get(bestMedoidIndex));
+ for (int pointIndexToReEvaluate : pointsToReEvaluate) {
+ long p = data[pointIndexToReEvaluate];
+ if (calculateResidualCost(p, currentPoint)
+ < calculateResidualCost(p, medoids.get(bestMedoidIndex))) {
+ pointsInClusters.get(bestMedoidIndex).remove(pointIndexToReEvaluate);
+ pointsInClusters.get(newMedoidId).add(pointIndexToReEvaluate);
+ pointToMedoidMap[pointIndexToReEvaluate] = newMedoidId;
+ }
+ }
+ } else {
+ // Decision: Assign to the existing best medoid.
+ pointsInClusters.get(bestMedoidIndex).add(i);
+ pointToMedoidMap[i] = bestMedoidIndex;
+ }
}
- /**
- * Sorts the final medoids and their cluster information based on cluster frequency.
- *
- * @param medoids The discovered medoids.
- * @param clusterAssignment The assignment map for each data point.
- * @param clusterSize The frequency of each cluster.
- * @return A sorted and correctly mapped Object array.
- */
- private static Object[] sortResults(long[] medoids, int[] clusterAssignment, long[] clusterSize) {
- int k = medoids.length;
- List<MedoidSortHelper> sorters = new ArrayList<>();
- for (int i = 0; i < k; i++) {
- sorters.add(new MedoidSortHelper(medoids[i], clusterSize[i], i));
- }
- Collections.sort(sorters);
-
- long[] sortedMedoids = new long[k];
- long[] sortedClusterSize = new long[k];
- int[] oldToNewIndexMap = new int[k];
-
- for (int i = 0; i < k; i++) {
- MedoidSortHelper sortedItem = sorters.get(i);
- sortedMedoids[i] = sortedItem.medoid;
- sortedClusterSize[i] = sortedItem.size;
- oldToNewIndexMap[sortedItem.originalIndex] = i;
- }
-
- int[] sortedClusterAssignment = new int[clusterAssignment.length];
- for (int i = 0; i < clusterAssignment.length; i++) {
- int oldIndex = clusterAssignment[i];
- sortedClusterAssignment[i] = oldToNewIndexMap[oldIndex];
- }
-
- return new Object[]{sortedMedoids, sortedClusterAssignment, sortedClusterSize};
+ // --- Step 6: Finalize and sort the results ---
+ int k = medoids.size();
+ long[] finalMedoids = medoids.stream().mapToLong(l -> l).toArray();
+ long[] rawClusterSizes = new long[k];
+ for (int i = 0; i < k; i++) {
+ rawClusterSizes[i] = pointsInClusters.get(i).size();
}
- // --- Cost Calculation Functions ---
+ return sortResults(finalMedoids, pointToMedoidMap, rawClusterSizes);
+ }
- private static long bitLengthCost(long value) {
- if (value == 0) return 1;
- return 64 - Long.numberOfLeadingZeros(value);
+ /** Helper class for sorting medoids based on their cluster size (frequency). */
+ private static class MedoidSortHelper implements Comparable<MedoidSortHelper> {
+ long medoid;
+ long size;
+ int originalIndex;
+
+ MedoidSortHelper(long medoid, long size, int originalIndex) {
+ this.medoid = medoid;
+ this.size = size;
+ this.originalIndex = originalIndex;
}
- private static long calculateResidualCost(long p1, long p2) {
- return 1 + bitLengthCost(Math.abs(p1 - p2));
+ @Override
+ public int compareTo(MedoidSortHelper other) {
+ return Long.compare(this.size, other.size);
+ }
+ }
+
+ /**
+ * Sorts the final medoids and their cluster information based on cluster frequency.
+ *
+ * @param medoids The discovered medoids.
+ * @param clusterAssignment The assignment map for each data point.
+ * @param clusterSize The frequency of each cluster.
+ * @return A sorted and correctly mapped Object array.
+ */
+ private static Object[] sortResults(long[] medoids, int[] clusterAssignment, long[] clusterSize) {
+ int k = medoids.length;
+ List<MedoidSortHelper> sorters = new ArrayList<>();
+ for (int i = 0; i < k; i++) {
+ sorters.add(new MedoidSortHelper(medoids[i], clusterSize[i], i));
+ }
+ Collections.sort(sorters);
+
+ long[] sortedMedoids = new long[k];
+ long[] sortedClusterSize = new long[k];
+ int[] oldToNewIndexMap = new int[k];
+
+ for (int i = 0; i < k; i++) {
+ MedoidSortHelper sortedItem = sorters.get(i);
+ sortedMedoids[i] = sortedItem.medoid;
+ sortedClusterSize[i] = sortedItem.size;
+ oldToNewIndexMap[sortedItem.originalIndex] = i;
}
- private static long calculateBasePointStorageCost(long basePoint) {
- return 1 + bitLengthCost(Math.abs(basePoint));
+ int[] sortedClusterAssignment = new int[clusterAssignment.length];
+ for (int i = 0; i < clusterAssignment.length; i++) {
+ int oldIndex = clusterAssignment[i];
+ sortedClusterAssignment[i] = oldToNewIndexMap[oldIndex];
}
+
+ return new Object[] {sortedMedoids, sortedClusterAssignment, sortedClusterSize};
+ }
+
+ // --- Cost Calculation Functions ---
+
+ private static long bitLengthCost(long value) {
+ if (value == 0) return 1;
+ return 64 - Long.numberOfLeadingZeros(value);
+ }
+
+ private static long calculateResidualCost(long p1, long p2) {
+ return 1 + bitLengthCost(Math.abs(p1 - p2));
+ }
+
+ private static long calculateBasePointStorageCost(long basePoint) {
+ return 1 + bitLengthCost(Math.abs(basePoint));
+ }
}
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/AClusterEncoder.java b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/AClusterEncoder.java
index 8e2ca0b..1cea597 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/AClusterEncoder.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/AClusterEncoder.java
@@ -19,8 +19,8 @@
package org.apache.tsfile.encoding.encoder;
-import org.apache.tsfile.exception.encoding.TsFileEncodingException;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.encoding.TsFileEncodingException;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.utils.Binary;
@@ -37,386 +37,430 @@
*/
public class AClusterEncoder extends Encoder {
- private static final int DEFAULT_PACK_SIZE = 10;
+ private static final int DEFAULT_PACK_SIZE = 10;
+ /** A buffer to store all values of a page before flushing. We use long to unify all types. */
+ private interface ValueBuffer {
+ /** Adds a value to the buffer. */
+ void add(int value);
- /** A buffer to store all values of a page before flushing. We use long to unify all types. */
- private interface ValueBuffer {
- /** Adds a value to the buffer. */
- void add(int value);
- void add(long value);
- void add(float value);
- void add(double value);
+ void add(long value);
- /** Clears the internal buffer. */
- void clear();
+ void add(float value);
- /** Checks if the buffer is empty. */
- boolean isEmpty();
+ void add(double value);
- /** Gets the current size of the buffer. */
- int size();
+ /** Clears the internal buffer. */
+ void clear();
- ProcessingResult processAndGet();
- }
+ /** Checks if the buffer is empty. */
+ boolean isEmpty();
- private static class Int64Buffer implements ValueBuffer {
- private final List<Long> values = new ArrayList<>();
+ /** Gets the current size of the buffer. */
+ int size();
- @Override
- public void add(int value) { values.add((long) value); }
- @Override
- public void add(long value) { values.add(value); }
- @Override
- public void add(float value) { /* Do nothing, type mismatch */ }
- @Override
- public void add(double value) { /* Do nothing, type mismatch */ }
+ ProcessingResult processAndGet();
+ }
- @Override
- public ProcessingResult processAndGet() { // <--- 实现新方法
- long[] data = values.stream().mapToLong(l -> l).toArray();
- return new ProcessingResult(data, 0); // Exponent is 0 for integers
- }
+ private static class Int64Buffer implements ValueBuffer {
+ private final List<Long> values = new ArrayList<>();
- @Override
- public void clear() { values.clear(); }
- @Override
- public boolean isEmpty() { return values.isEmpty(); }
- @Override
- public int size() { return values.size(); }
- }
-
- /** A buffer for FLOAT and DOUBLE types. It performs scaling in processAndGetLongs(). */
- private static class DoubleBuffer implements ValueBuffer {
- private final List<Double> values = new ArrayList<>();
-
- @Override
- public void add(int value) { /* Do nothing, type mismatch */ }
- @Override
- public void add(long value) { /* Do nothing, type mismatch */ }
- @Override
- public void add(float value) { values.add((double) value); } // Store as double to unify
- @Override
- public void add(double value) { values.add(value); }
-
- @Override
- public ProcessingResult processAndGet() {
- // --- Edge Case: Handle empty buffer ---
- if (values.isEmpty()) {
- return new ProcessingResult(new long[0], 0);
- }
- int maxDecimalPlaces = 0;
- for (double v : values) {
- String s = BigDecimal.valueOf(v).toPlainString();
- int dotIndex = s.indexOf('.');
- if (dotIndex != -1) {
- int decimalPlaces = s.length() - dotIndex - 1;
- if (decimalPlaces > maxDecimalPlaces) {
- maxDecimalPlaces = decimalPlaces;
- }
- }
- }
-
- double scalingFactor = Math.pow(10, maxDecimalPlaces);
-
- long[] scaledLongs = new long[values.size()];
- for (int i = 0; i < values.size(); i++) {
- scaledLongs[i] = Math.round(values.get(i) * scalingFactor);
- }
-
- return new ProcessingResult(scaledLongs, maxDecimalPlaces);
- }
-
- @Override
- public void clear() { values.clear(); }
- @Override
- public boolean isEmpty() { return values.isEmpty(); }
- @Override
- public int size() { return values.size(); }
- }
-
- private static class ProcessingResult {
- final long[] scaledLongs;
- final int scalingExponent; // e.g., 3 for a scaling factor of 1000
-
- ProcessingResult(long[] scaledLongs, int scalingExponent) {
- this.scaledLongs = scaledLongs;
- this.scalingExponent = scalingExponent;
- }
-
- long[] getScaledLongs(){
- return this.scaledLongs;
- }
-
- int getScalingExponent(){
- return this.scalingExponent;
- }
- }
-
-
- private final ValueBuffer buffer;
-
- /**
- * Constructor for AClusterEncoder. It's called by AClusterEncodingBuilder.
- *
- * @param dataType The data type of the time series, used for potential type-specific logic.
- */
- public AClusterEncoder(TSDataType dataType) {
- super(TSEncoding.ACLUSTER);
- switch (dataType) {
- case INT32: case INT64:
- this.buffer = new Int64Buffer();
- break;
- case FLOAT: case DOUBLE:
- this.buffer = new DoubleBuffer();
- break;
- default:
- throw new TsFileEncodingException("AClusterEncoder does not support data type: " + dataType);
- }
+ @Override
+ public void add(int value) {
+ values.add((long) value);
}
@Override
- public void encode(int value, ByteArrayOutputStream out) {
- buffer.add(value);
+ public void add(long value) {
+ values.add(value);
}
@Override
- public void encode(long value, ByteArrayOutputStream out) {
- buffer.add(value);
+ public void add(float value) {
+ /* Do nothing, type mismatch */
}
@Override
- public void encode(float value, ByteArrayOutputStream out) {
- buffer.add(value);
+ public void add(double value) {
+ /* Do nothing, type mismatch */
}
@Override
- public void encode(double value, ByteArrayOutputStream out) {
- buffer.add(value);
+ public ProcessingResult processAndGet() { // <--- 实现新方法
+ long[] data = values.stream().mapToLong(l -> l).toArray();
+ return new ProcessingResult(data, 0); // Exponent is 0 for integers
}
@Override
- public void flush(ByteArrayOutputStream out) throws IOException {
- if (buffer.isEmpty()) {
- return;
- }
-
- ProcessingResult procResult = buffer.processAndGet();
- long[] originalData = procResult.getScaledLongs();
- int scalingExponent = procResult.getScalingExponent();
- if (originalData.length == 0) return;
- long minVal = findMin(originalData);
- long[] data = new long[originalData.length];
- for (int i = 0; i < data.length; i++) {
- data[i] = originalData[i] - minVal;
- }
-
- Object[] clusterResult = AClusterAlgorithm.run(data);
- long[] sortedMedoids = (long[]) clusterResult[0];
- int[] clusterAssignments = (int[]) clusterResult[1];
- long[] clusterFrequencies = (long[]) clusterResult[2];
-
- long[] sortedZigzagResiduals = calculateSortedZigzagResiduals(data, sortedMedoids, clusterAssignments, clusterFrequencies);
-
- encodeResults(out, scalingExponent, minVal, sortedMedoids, clusterFrequencies, sortedZigzagResiduals);
-
- buffer.clear();
- }
-
- private static class MedoidFreqPair {
- long medoid;
- long frequency;
- int originalIndex;
-
- MedoidFreqPair(long medoid, long frequency, int originalIndex) {
- this.medoid = medoid;
- this.frequency = frequency;
- this.originalIndex = originalIndex;
- }
- }
-
- /**
- * A direct translation of your `residualCalculationZigzagNoHuff_sorted` logic.
- * It computes residuals, applies zigzag encoding, and groups them by cluster.
- */
- private long[] calculateSortedZigzagResiduals(long[] data, long[] medoids, int[] assignments, long[] frequencies) {
- int n = data.length;
- int k = medoids.length;
- if (n == 0) return new long[0];
-
- long[] sortedResiduals = new long[n];
- int[] writePointers = new int[k];
- int cumulativeCount = 0;
- for (int i = 0; i < k; i++) {
- writePointers[i] = cumulativeCount;
- cumulativeCount += (int) frequencies[i];
- }
-
- for (int i = 0; i < n; i++) {
- int clusterId = assignments[i];
- long medoid = medoids[clusterId];
- long residual = data[i] - medoid;
- long zigzagResidual = (residual << 1) ^ (residual >> 63); // Zigzag Encoding
-
- int targetIndex = writePointers[clusterId];
- sortedResiduals[targetIndex] = zigzagResidual;
- writePointers[clusterId]++;
- }
- return sortedResiduals;
- }
-
- private void encodeResults(
- ByteArrayOutputStream out,
- int scalingExponent,
- long minVal,
- long[] medoids,
- long[] frequencies,
- long[] residuals)
- throws IOException {
-
- ClusterSupport writer = new ClusterSupport(out);
- int numPoints = residuals.length;
- int k = medoids.length;
-
- writer.write(scalingExponent, 8);
- writer.write(k, 16);
- writer.write(numPoints, 16);
- writer.write(DEFAULT_PACK_SIZE, 16);
-
- int minValBit = ClusterSupport.bitsRequired(Math.abs(minVal));
- writer.write(minValBit, 8);
- writer.write(minVal>=0?0:1,1);
- writer.write(minVal, minValBit);
-
- if (k == 0) {
- writer.flush();
- return;
- }
-
- long minMedoid = findMin(medoids);
- long[] medoidOffsets = new long[k];
- int maxMedoidOffsetBits = 0;
- for (int i = 0; i < k; i++) {
- medoidOffsets[i] = medoids[i] - minMedoid;
- maxMedoidOffsetBits = Math.max(maxMedoidOffsetBits, ClusterSupport.bitsRequired(medoidOffsets[i]));
- }
- int minMedoidBit = ClusterSupport.bitsRequired(Math.abs(minMedoid));
- writer.write(minMedoidBit,8);
- writer.write(minMedoid>0?0:1,1);
- writer.write(minMedoid, minMedoidBit);
-
- writer.write(maxMedoidOffsetBits, 8);
- for (long offset : medoidOffsets) {
- writer.write(offset, maxMedoidOffsetBits);
- }
-
- long[] freqDeltas = new long[k];
- if (k > 0) {
- freqDeltas[0] = frequencies[0];
- for (int i = 1; i < k; i++) {
- freqDeltas[i] = frequencies[i] - frequencies[i - 1];
- }
- }
- int numFreqBlocks = (k + DEFAULT_PACK_SIZE - 1) / DEFAULT_PACK_SIZE;
- writer.write(numFreqBlocks, 16);
-
- int[] freqBlockMaxBits = new int[numFreqBlocks];
- // Metadata pass for frequencies
- for (int i = 0; i < numFreqBlocks; i++) {
- int start = i * DEFAULT_PACK_SIZE;
- int end = Math.min(start + DEFAULT_PACK_SIZE, k);
- long maxDelta = 0;
- for (int j = start; j < end; j++) {
- maxDelta = Math.max(maxDelta, freqDeltas[j]);
- }
- freqBlockMaxBits[i] = ClusterSupport.bitsRequired(maxDelta);
- writer.write(freqBlockMaxBits[i], 8);
- }
- // Data pass for frequencies
- for (int i = 0; i < numFreqBlocks; i++) {
- int start = i * DEFAULT_PACK_SIZE;
- int end = Math.min(start + DEFAULT_PACK_SIZE, k);
- int bitsForBlock = freqBlockMaxBits[i];
- for (int j = start; j < end; j++) {
- writer.write(freqDeltas[j], bitsForBlock);
- }
- }
-
- int numPacks = (numPoints + DEFAULT_PACK_SIZE - 1) / DEFAULT_PACK_SIZE;
- writer.write(numPacks, 32);
-
- int[] resPackMaxBits = new int[numPacks];
- // Metadata pass for residuals
- for (int i = 0; i < numPacks; i++) {
- int start = i * DEFAULT_PACK_SIZE;
- int end = Math.min(start + DEFAULT_PACK_SIZE, numPoints);
- long maxOffset = 0;
- for (int j = start; j < end; j++) {
- maxOffset = Math.max(maxOffset, residuals[j]);
- }
- resPackMaxBits[i] = ClusterSupport.bitsRequired(maxOffset);
- writer.write(resPackMaxBits[i], 8);
- }
- // Data pass for residuals
- for (int i = 0; i < numPacks; i++) {
- int start = i * DEFAULT_PACK_SIZE;
- int end = Math.min(start + DEFAULT_PACK_SIZE, numPoints);
- int bitsForPack = resPackMaxBits[i];
- if (bitsForPack > 0) {
- for (int j = start; j < end; j++) {
- writer.write(residuals[j], bitsForPack);
- }
- }
- }
-
- writer.flush();
- }
-
- private long findMin(long[] data) {
- if (data == null || data.length == 0) {
- throw new IllegalArgumentException("Data array cannot be null or empty.");
- }
- long min = data[0];
- for (int i = 1; i < data.length; i++) {
- if (data[i] < min) {
- min = data[i];
- }
- }
- return min;
+ public void clear() {
+ values.clear();
}
@Override
- public int getOneItemMaxSize() {
- return 8;
+ public boolean isEmpty() {
+ return values.isEmpty();
}
@Override
- public long getMaxByteSize() {
- if (this.buffer.isEmpty()) {
- return 0;
+ public int size() {
+ return values.size();
+ }
+ }
+
+ /** A buffer for FLOAT and DOUBLE types. It performs scaling in processAndGetLongs(). */
+ private static class DoubleBuffer implements ValueBuffer {
+ private final List<Double> values = new ArrayList<>();
+
+ @Override
+ public void add(int value) {
+ /* Do nothing, type mismatch */
+ }
+
+ @Override
+ public void add(long value) {
+ /* Do nothing, type mismatch */
+ }
+
+ @Override
+ public void add(float value) {
+ values.add((double) value);
+ } // Store as double to unify
+
+ @Override
+ public void add(double value) {
+ values.add(value);
+ }
+
+ @Override
+ public ProcessingResult processAndGet() {
+ // --- Edge Case: Handle empty buffer ---
+ if (values.isEmpty()) {
+ return new ProcessingResult(new long[0], 0);
+ }
+ int maxDecimalPlaces = 0;
+ for (double v : values) {
+ String s = BigDecimal.valueOf(v).toPlainString();
+ int dotIndex = s.indexOf('.');
+ if (dotIndex != -1) {
+ int decimalPlaces = s.length() - dotIndex - 1;
+ if (decimalPlaces > maxDecimalPlaces) {
+ maxDecimalPlaces = decimalPlaces;
+ }
}
- return (long) this.buffer.size() * getOneItemMaxSize() * 3 / 2;
+ }
+
+ double scalingFactor = Math.pow(10, maxDecimalPlaces);
+
+ long[] scaledLongs = new long[values.size()];
+ for (int i = 0; i < values.size(); i++) {
+ scaledLongs[i] = Math.round(values.get(i) * scalingFactor);
+ }
+
+ return new ProcessingResult(scaledLongs, maxDecimalPlaces);
}
@Override
- public void encode(boolean value, ByteArrayOutputStream out) {
- throw new TsFileEncodingException("AClusterEncoder does not support boolean values.");
+ public void clear() {
+ values.clear();
}
@Override
- public void encode(short value, ByteArrayOutputStream out) {
- throw new TsFileEncodingException("AClusterEncoder does not support short values.");
+ public boolean isEmpty() {
+ return values.isEmpty();
}
@Override
- public void encode(Binary value, ByteArrayOutputStream out) {
- throw new TsFileEncodingException("AClusterEncoder does not support Binary values.");
+ public int size() {
+ return values.size();
+ }
+ }
+
+ private static class ProcessingResult {
+ final long[] scaledLongs;
+ final int scalingExponent; // e.g., 3 for a scaling factor of 1000
+
+ ProcessingResult(long[] scaledLongs, int scalingExponent) {
+ this.scaledLongs = scaledLongs;
+ this.scalingExponent = scalingExponent;
}
- @Override
- public void encode(BigDecimal value, ByteArrayOutputStream out) {
- throw new TsFileEncodingException("AClusterEncoder does not support BigDecimal values.");
+ long[] getScaledLongs() {
+ return this.scaledLongs;
}
+
+ int getScalingExponent() {
+ return this.scalingExponent;
+ }
+ }
+
+ private final ValueBuffer buffer;
+
+ /**
+ * Constructor for AClusterEncoder. It's called by AClusterEncodingBuilder.
+ *
+ * @param dataType The data type of the time series, used for potential type-specific logic.
+ */
+ public AClusterEncoder(TSDataType dataType) {
+ super(TSEncoding.ACLUSTER);
+ switch (dataType) {
+ case INT32:
+ case INT64:
+ this.buffer = new Int64Buffer();
+ break;
+ case FLOAT:
+ case DOUBLE:
+ this.buffer = new DoubleBuffer();
+ break;
+ default:
+ throw new TsFileEncodingException(
+ "AClusterEncoder does not support data type: " + dataType);
+ }
+ }
+
+ @Override
+ public void encode(int value, ByteArrayOutputStream out) {
+ buffer.add(value);
+ }
+
+ @Override
+ public void encode(long value, ByteArrayOutputStream out) {
+ buffer.add(value);
+ }
+
+ @Override
+ public void encode(float value, ByteArrayOutputStream out) {
+ buffer.add(value);
+ }
+
+ @Override
+ public void encode(double value, ByteArrayOutputStream out) {
+ buffer.add(value);
+ }
+
+ @Override
+ public void flush(ByteArrayOutputStream out) throws IOException {
+ if (buffer.isEmpty()) {
+ return;
+ }
+
+ ProcessingResult procResult = buffer.processAndGet();
+ long[] originalData = procResult.getScaledLongs();
+ int scalingExponent = procResult.getScalingExponent();
+ if (originalData.length == 0) return;
+ long minVal = findMin(originalData);
+ long[] data = new long[originalData.length];
+ for (int i = 0; i < data.length; i++) {
+ data[i] = originalData[i] - minVal;
+ }
+
+ Object[] clusterResult = AClusterAlgorithm.run(data);
+ long[] sortedMedoids = (long[]) clusterResult[0];
+ int[] clusterAssignments = (int[]) clusterResult[1];
+ long[] clusterFrequencies = (long[]) clusterResult[2];
+
+ long[] sortedZigzagResiduals =
+ calculateSortedZigzagResiduals(data, sortedMedoids, clusterAssignments, clusterFrequencies);
+
+ encodeResults(
+ out, scalingExponent, minVal, sortedMedoids, clusterFrequencies, sortedZigzagResiduals);
+
+ buffer.clear();
+ }
+
+ private static class MedoidFreqPair {
+ long medoid;
+ long frequency;
+ int originalIndex;
+
+ MedoidFreqPair(long medoid, long frequency, int originalIndex) {
+ this.medoid = medoid;
+ this.frequency = frequency;
+ this.originalIndex = originalIndex;
+ }
+ }
+
+ /**
+ * A direct translation of your `residualCalculationZigzagNoHuff_sorted` logic. It computes
+ * residuals, applies zigzag encoding, and groups them by cluster.
+ */
+ private long[] calculateSortedZigzagResiduals(
+ long[] data, long[] medoids, int[] assignments, long[] frequencies) {
+ int n = data.length;
+ int k = medoids.length;
+ if (n == 0) return new long[0];
+
+ long[] sortedResiduals = new long[n];
+ int[] writePointers = new int[k];
+ int cumulativeCount = 0;
+ for (int i = 0; i < k; i++) {
+ writePointers[i] = cumulativeCount;
+ cumulativeCount += (int) frequencies[i];
+ }
+
+ for (int i = 0; i < n; i++) {
+ int clusterId = assignments[i];
+ long medoid = medoids[clusterId];
+ long residual = data[i] - medoid;
+ long zigzagResidual = (residual << 1) ^ (residual >> 63); // Zigzag Encoding
+
+ int targetIndex = writePointers[clusterId];
+ sortedResiduals[targetIndex] = zigzagResidual;
+ writePointers[clusterId]++;
+ }
+ return sortedResiduals;
+ }
+
+ private void encodeResults(
+ ByteArrayOutputStream out,
+ int scalingExponent,
+ long minVal,
+ long[] medoids,
+ long[] frequencies,
+ long[] residuals)
+ throws IOException {
+
+ ClusterSupport writer = new ClusterSupport(out);
+ int numPoints = residuals.length;
+ int k = medoids.length;
+
+ writer.write(scalingExponent, 8);
+ writer.write(k, 16);
+ writer.write(numPoints, 16);
+ writer.write(DEFAULT_PACK_SIZE, 16);
+
+ int minValBit = ClusterSupport.bitsRequired(Math.abs(minVal));
+ writer.write(minValBit, 8);
+ writer.write(minVal >= 0 ? 0 : 1, 1);
+ writer.write(minVal, minValBit);
+
+ if (k == 0) {
+ writer.flush();
+ return;
+ }
+
+ long minMedoid = findMin(medoids);
+ long[] medoidOffsets = new long[k];
+ int maxMedoidOffsetBits = 0;
+ for (int i = 0; i < k; i++) {
+ medoidOffsets[i] = medoids[i] - minMedoid;
+ maxMedoidOffsetBits =
+ Math.max(maxMedoidOffsetBits, ClusterSupport.bitsRequired(medoidOffsets[i]));
+ }
+ int minMedoidBit = ClusterSupport.bitsRequired(Math.abs(minMedoid));
+ writer.write(minMedoidBit, 8);
+ writer.write(minMedoid > 0 ? 0 : 1, 1);
+ writer.write(minMedoid, minMedoidBit);
+
+ writer.write(maxMedoidOffsetBits, 8);
+ for (long offset : medoidOffsets) {
+ writer.write(offset, maxMedoidOffsetBits);
+ }
+
+ long[] freqDeltas = new long[k];
+ if (k > 0) {
+ freqDeltas[0] = frequencies[0];
+ for (int i = 1; i < k; i++) {
+ freqDeltas[i] = frequencies[i] - frequencies[i - 1];
+ }
+ }
+ int numFreqBlocks = (k + DEFAULT_PACK_SIZE - 1) / DEFAULT_PACK_SIZE;
+ writer.write(numFreqBlocks, 16);
+
+ int[] freqBlockMaxBits = new int[numFreqBlocks];
+ // Metadata pass for frequencies
+ for (int i = 0; i < numFreqBlocks; i++) {
+ int start = i * DEFAULT_PACK_SIZE;
+ int end = Math.min(start + DEFAULT_PACK_SIZE, k);
+ long maxDelta = 0;
+ for (int j = start; j < end; j++) {
+ maxDelta = Math.max(maxDelta, freqDeltas[j]);
+ }
+ freqBlockMaxBits[i] = ClusterSupport.bitsRequired(maxDelta);
+ writer.write(freqBlockMaxBits[i], 8);
+ }
+ // Data pass for frequencies
+ for (int i = 0; i < numFreqBlocks; i++) {
+ int start = i * DEFAULT_PACK_SIZE;
+ int end = Math.min(start + DEFAULT_PACK_SIZE, k);
+ int bitsForBlock = freqBlockMaxBits[i];
+ for (int j = start; j < end; j++) {
+ writer.write(freqDeltas[j], bitsForBlock);
+ }
+ }
+
+ int numPacks = (numPoints + DEFAULT_PACK_SIZE - 1) / DEFAULT_PACK_SIZE;
+ writer.write(numPacks, 32);
+
+ int[] resPackMaxBits = new int[numPacks];
+ // Metadata pass for residuals
+ for (int i = 0; i < numPacks; i++) {
+ int start = i * DEFAULT_PACK_SIZE;
+ int end = Math.min(start + DEFAULT_PACK_SIZE, numPoints);
+ long maxOffset = 0;
+ for (int j = start; j < end; j++) {
+ maxOffset = Math.max(maxOffset, residuals[j]);
+ }
+ resPackMaxBits[i] = ClusterSupport.bitsRequired(maxOffset);
+ writer.write(resPackMaxBits[i], 8);
+ }
+ // Data pass for residuals
+ for (int i = 0; i < numPacks; i++) {
+ int start = i * DEFAULT_PACK_SIZE;
+ int end = Math.min(start + DEFAULT_PACK_SIZE, numPoints);
+ int bitsForPack = resPackMaxBits[i];
+ if (bitsForPack > 0) {
+ for (int j = start; j < end; j++) {
+ writer.write(residuals[j], bitsForPack);
+ }
+ }
+ }
+
+ writer.flush();
+ }
+
+ private long findMin(long[] data) {
+ if (data == null || data.length == 0) {
+ throw new IllegalArgumentException("Data array cannot be null or empty.");
+ }
+ long min = data[0];
+ for (int i = 1; i < data.length; i++) {
+ if (data[i] < min) {
+ min = data[i];
+ }
+ }
+ return min;
+ }
+
+ @Override
+ public int getOneItemMaxSize() {
+ return 8;
+ }
+
+ @Override
+ public long getMaxByteSize() {
+ if (this.buffer.isEmpty()) {
+ return 0;
+ }
+ return (long) this.buffer.size() * getOneItemMaxSize() * 3 / 2;
+ }
+
+ @Override
+ public void encode(boolean value, ByteArrayOutputStream out) {
+ throw new TsFileEncodingException("AClusterEncoder does not support boolean values.");
+ }
+
+ @Override
+ public void encode(short value, ByteArrayOutputStream out) {
+ throw new TsFileEncodingException("AClusterEncoder does not support short values.");
+ }
+
+ @Override
+ public void encode(Binary value, ByteArrayOutputStream out) {
+ throw new TsFileEncodingException("AClusterEncoder does not support Binary values.");
+ }
+
+ @Override
+ public void encode(BigDecimal value, ByteArrayOutputStream out) {
+ throw new TsFileEncodingException("AClusterEncoder does not support BigDecimal values.");
+ }
}
-
-
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/ClusterSupport.java b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/ClusterSupport.java
index 6a189a9..32352c8 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/ClusterSupport.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/ClusterSupport.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.tsfile.encoding.encoder;
import java.io.ByteArrayOutputStream;
@@ -5,73 +24,74 @@
public class ClusterSupport {
- private final ByteArrayOutputStream out;
- private byte currentByte;
- private int bitPosition; // 0-7, from left to right (MSB to LSB)
+ private final ByteArrayOutputStream out;
+ private byte currentByte;
+ private int bitPosition; // 0-7, from left to right (MSB to LSB)
- public ClusterSupport(ByteArrayOutputStream out) {
- this.out = out;
- this.currentByte = 0;
- this.bitPosition = 0;
- }
+ public ClusterSupport(ByteArrayOutputStream out) {
+ this.out = out;
+ this.currentByte = 0;
+ this.bitPosition = 0;
+ }
- /**
- * Writes a value using a specified number of bits.
- *
- * @param value The long value to write. Only the lower `numBits` will be used.
- * @param numBits The number of bits to write for the value (must be > 0 and <= 64).
- * @throws IOException If an I/O error occurs.
- */
- public void write(long value, int numBits) throws IOException {
- if (numBits <= 0 || numBits > 64) {
- throw new IllegalArgumentException("Number of bits must be between 1 and 64.");
- }
- for (int i = numBits - 1; i >= 0; i--) {
- // Get the i-th bit from the value
- boolean bit = ((value >> i) & 1) == 1;
- writeBit(bit);
- }
+ /**
+ * Writes a value using a specified number of bits.
+ *
+ * @param value The long value to write. Only the lower `numBits` will be used.
+ * @param numBits The number of bits to write for the value (must be > 0 and <= 64).
+ * @throws IOException If an I/O error occurs.
+ */
+ public void write(long value, int numBits) throws IOException {
+ if (numBits <= 0 || numBits > 64) {
+ throw new IllegalArgumentException("Number of bits must be between 1 and 64.");
}
+ for (int i = numBits - 1; i >= 0; i--) {
+ // Get the i-th bit from the value
+ boolean bit = ((value >> i) & 1) == 1;
+ writeBit(bit);
+ }
+ }
- private void writeBit(boolean bit) throws IOException {
- if (bit) {
- currentByte |= (1 << (7 - bitPosition));
- }
- bitPosition++;
- if (bitPosition == 8) {
- out.write(currentByte);
- currentByte = 0;
- bitPosition = 0;
- }
+ private void writeBit(boolean bit) throws IOException {
+ if (bit) {
+ currentByte |= (1 << (7 - bitPosition));
}
+ bitPosition++;
+ if (bitPosition == 8) {
+ out.write(currentByte);
+ currentByte = 0;
+ bitPosition = 0;
+ }
+ }
- /**
- * Flushes any remaining bits in the current byte to the output stream.
- * This must be called at the end to ensure all data is written.
- * @throws IOException If an I/O error occurs.
- */
- public void flush() throws IOException {
- if (bitPosition > 0) {
- out.write(currentByte);
- }
- // It's good practice to reset, though not strictly necessary if the instance is discarded.
- currentByte = 0;
- bitPosition = 0;
+ /**
+ * Flushes any remaining bits in the current byte to the output stream. This must be called at the
+ * end to ensure all data is written.
+ *
+ * @throws IOException If an I/O error occurs.
+ */
+ public void flush() throws IOException {
+ if (bitPosition > 0) {
+ out.write(currentByte);
}
+ // It's good practice to reset, though not strictly necessary if the instance is discarded.
+ currentByte = 0;
+ bitPosition = 0;
+ }
- /**
- * A helper to calculate the number of bits required for a non-negative long value.
- *
- * @param value The non-negative value.
- * @return The number of bits required to represent it.
- */
- public static int bitsRequired(long value) {
- if (value < 0) {
- throw new IllegalArgumentException("Value must be non-negative.");
- }
- if (value == 0) {
- return 1;
- }
- return 64 - Long.numberOfLeadingZeros(value);
+ /**
+ * A helper to calculate the number of bits required for a non-negative long value.
+ *
+ * @param value The non-negative value.
+ * @return The number of bits required to represent it.
+ */
+ public static int bitsRequired(long value) {
+ if (value < 0) {
+ throw new IllegalArgumentException("Value must be non-negative.");
}
+ if (value == 0) {
+ return 1;
+ }
+ return 64 - Long.numberOfLeadingZeros(value);
+ }
}
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/KClusterAlgorithm.java b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/KClusterAlgorithm.java
index 53016aa..1288244 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/KClusterAlgorithm.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/KClusterAlgorithm.java
@@ -1,278 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.tsfile.encoding.encoder;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
-import java.util.HashSet;
-import java.util.Arrays;
public class KClusterAlgorithm {
- private static final Random rand = new Random();
+ private static final Random rand = new Random();
- /**
- * Private constructor to prevent instantiation.
- */
- private KClusterAlgorithm() {}
+ /** Private constructor to prevent instantiation. */
+ private KClusterAlgorithm() {}
- public static Object[] run(long[] data, int k) {
- if (data == null || data.length == 0) {
- return new Object[] {new long[0], new int[0], new long[0]};
- }
-
- long[][] data2D = new long[data.length][1];
- for (int i = 0; i < data.length; i++) {
- data2D[i][0] = data[i];
- }
-
- return kMedoidLogCost(data, k, 2, 0.01);
+ public static Object[] run(long[] data, int k) {
+ if (data == null || data.length == 0) {
+ return new Object[] {new long[0], new int[0], new long[0]};
}
- /**
- * Helper class for sorting medoids based on their cluster size (frequency).
- */
- private static class MedoidSortHelper implements Comparable<KClusterAlgorithm.MedoidSortHelper> {
- long medoid;
- long size;
- int originalIndex;
-
- MedoidSortHelper(long medoid, long size, int originalIndex) {
- this.medoid = medoid;
- this.size = size;
- this.originalIndex = originalIndex;
- }
-
- @Override
- public int compareTo(KClusterAlgorithm.MedoidSortHelper other) {
- return Long.compare(this.size, other.size);
- }
+ long[][] data2D = new long[data.length][1];
+ for (int i = 0; i < data.length; i++) {
+ data2D[i][0] = data[i];
}
- /**
- * The core K-Medoids algorithm, specifically implemented for 1D data.
- */
- private static Object[] kMedoidLogCost(long[] data, int k, int maxIter, double tol) {
- int n = data.length;
+ return kMedoidLogCost(data, k, 2, 0.01);
+ }
- // Use a HashSet for efficient uniqueness checking on primitive longs.
- Set<Long> uniquePoints = new HashSet<>();
- for (long point : data) {
- uniquePoints.add(point);
- if (uniquePoints.size() > k) {
- break;
- }
- }
- int distinctCount = uniquePoints.size();
- if (distinctCount < k) {
- System.err.println("Warning: Distinct data points (" + distinctCount +
- ") is less than input k (" + k + "), setting k to " + distinctCount);
- k = distinctCount;
- }
+ /** Helper class for sorting medoids based on their cluster size (frequency). */
+ private static class MedoidSortHelper implements Comparable<KClusterAlgorithm.MedoidSortHelper> {
+ long medoid;
+ long size;
+ int originalIndex;
- if (k <= 0) {
- return new Object[]{new long[0], new int[n], new long[0]};
- }
-
- // 1. Initialize medoids using a K-Medoids++ style approach.
- long[] medoids = acceleratedInitialization(data, k);
- int[] clusterAssignment = new int[n];
- long previousTotalCost = Long.MAX_VALUE;
-
- // 2. Main iterative loop (Build and Swap phases).
- for (int iteration = 0; iteration < maxIter; iteration++) {
- // --- Assignment Step ---
- long totalCostThisRound = 0L;
- for (int i = 0; i < n; i++) {
- long minCost = Long.MAX_VALUE;
- int assignedMedoidIndex = -1;
- for (int m = 0; m < k; m++) {
- long cost = calculateResidualCost(data[i], medoids[m]);
- if (cost < minCost) {
- minCost = cost;
- assignedMedoidIndex = m;
- if (minCost == 0) break; // Optimization: A perfect match is unbeatable.
- }
- }
- clusterAssignment[i] = assignedMedoidIndex;
- totalCostThisRound += minCost;
- }
-
- // --- Convergence Check (Cost) ---
- if (iteration > 0 && (previousTotalCost - totalCostThisRound) < tol) {
- break;
- }
- previousTotalCost = totalCostThisRound;
-
- // --- Update Step ---
- long[] newMedoids = updateMedoids(data, clusterAssignment, k);
-
- // --- Convergence Check (Medoids) ---
- if (Arrays.equals(medoids, newMedoids)) {
- break;
- }
- medoids = newMedoids;
- }
-
- // 3. Calculate final cluster sizes and sort results.
- long[] finalClusterSizes = new long[k];
- for (int assignment : clusterAssignment) {
- if (assignment != -1) finalClusterSizes[assignment]++;
- }
- return sortResults(medoids, clusterAssignment, finalClusterSizes);
+ MedoidSortHelper(long medoid, long size, int originalIndex) {
+ this.medoid = medoid;
+ this.size = size;
+ this.originalIndex = originalIndex;
}
- /**
- * K-Medoids++ style initialization for 1D data.
- */
- private static long[] acceleratedInitialization(long[] data, int k) {
- long[] medoids = new long[k];
- Set<Long> selectedMedoids = new HashSet<>();
+ @Override
+ public int compareTo(KClusterAlgorithm.MedoidSortHelper other) {
+ return Long.compare(this.size, other.size);
+ }
+ }
- int firstIndex = rand.nextInt(data.length);
- medoids[0] = data[firstIndex];
- selectedMedoids.add(medoids[0]);
+ /** The core K-Medoids algorithm, specifically implemented for 1D data. */
+ private static Object[] kMedoidLogCost(long[] data, int k, int maxIter, double tol) {
+ int n = data.length;
- long[] distances = new long[data.length];
- for (int i = 0; i < data.length; i++) {
- distances[i] = Math.abs(data[i] - medoids[0]);
- }
-
- for (int i = 1; i < k; i++) {
- long[] prefixSums = new long[data.length];
- prefixSums[0] = distances[0];
- for (int p = 1; p < data.length; p++) {
- prefixSums[p] = prefixSums[p - 1] + distances[p];
- }
- long totalDistance = prefixSums[data.length - 1];
- if (totalDistance == 0) {
- int idx = rand.nextInt(data.length);
- while (selectedMedoids.contains(data[idx])) {
- idx = (idx + 1) % data.length;
- }
- medoids[i] = data[idx];
- selectedMedoids.add(medoids[i]);
- continue;
- }
-
- long randValue = (long) (rand.nextDouble() * totalDistance);
- int chosenIdx = binarySearch(prefixSums, randValue);
-
- while (selectedMedoids.contains(data[chosenIdx])) {
- chosenIdx = (chosenIdx + 1) % data.length;
- }
- medoids[i] = data[chosenIdx];
- selectedMedoids.add(medoids[i]);
-
- for (int idx = 0; idx < data.length; idx++) {
- long distNewMedoid = Math.abs(data[idx] - medoids[i]);
- if (distNewMedoid < distances[idx]) {
- distances[idx] = distNewMedoid;
- }
- }
- }
- return medoids;
+ // Use a HashSet for efficient uniqueness checking on primitive longs.
+ Set<Long> uniquePoints = new HashSet<>();
+ for (long point : data) {
+ uniquePoints.add(point);
+ if (uniquePoints.size() > k) {
+ break;
+ }
+ }
+ int distinctCount = uniquePoints.size();
+ if (distinctCount < k) {
+ System.err.println(
+ "Warning: Distinct data points ("
+ + distinctCount
+ + ") is less than input k ("
+ + k
+ + "), setting k to "
+ + distinctCount);
+ k = distinctCount;
}
- /**
- * Updates medoids by finding the point within each cluster that minimizes the total intra-cluster cost.
- */
- private static long[] updateMedoids(long[] data, int[] clusterAssignment, int k) {
- long[] newMedoids = new long[k];
- List<Long>[] clusterPoints = new ArrayList[k];
- for (int i = 0; i < k; i++) {
- clusterPoints[i] = new ArrayList<>();
- }
- for (int i = 0; i < data.length; i++) {
- if (clusterAssignment[i] != -1) {
- clusterPoints[clusterAssignment[i]].add(data[i]);
- }
- }
+ if (k <= 0) {
+ return new Object[] {new long[0], new int[n], new long[0]};
+ }
+ // 1. Initialize medoids using a K-Medoids++ style approach.
+ long[] medoids = acceleratedInitialization(data, k);
+ int[] clusterAssignment = new int[n];
+ long previousTotalCost = Long.MAX_VALUE;
+
+ // 2. Main iterative loop (Build and Swap phases).
+ for (int iteration = 0; iteration < maxIter; iteration++) {
+ // --- Assignment Step ---
+ long totalCostThisRound = 0L;
+ for (int i = 0; i < n; i++) {
+ long minCost = Long.MAX_VALUE;
+ int assignedMedoidIndex = -1;
for (int m = 0; m < k; m++) {
- List<Long> members = clusterPoints[m];
- if (members.isEmpty()) continue;
-
- long minTotalClusterCost = Long.MAX_VALUE;
- long newMedoid = members.get(0);
-
- for (Long candidate : members) {
- long currentCandidateTotalCost = 0L;
- for (Long otherMember : members) {
- currentCandidateTotalCost += calculateResidualCost(candidate, otherMember);
- }
- if (currentCandidateTotalCost < minTotalClusterCost) {
- minTotalClusterCost = currentCandidateTotalCost;
- newMedoid = candidate;
- }
- }
- newMedoids[m] = newMedoid;
+ long cost = calculateResidualCost(data[i], medoids[m]);
+ if (cost < minCost) {
+ minCost = cost;
+ assignedMedoidIndex = m;
+ if (minCost == 0) break; // Optimization: A perfect match is unbeatable.
+ }
}
- return newMedoids;
+ clusterAssignment[i] = assignedMedoidIndex;
+ totalCostThisRound += minCost;
+ }
+
+ // --- Convergence Check (Cost) ---
+ if (iteration > 0 && (previousTotalCost - totalCostThisRound) < tol) {
+ break;
+ }
+ previousTotalCost = totalCostThisRound;
+
+ // --- Update Step ---
+ long[] newMedoids = updateMedoids(data, clusterAssignment, k);
+
+ // --- Convergence Check (Medoids) ---
+ if (Arrays.equals(medoids, newMedoids)) {
+ break;
+ }
+ medoids = newMedoids;
}
- /**
- * Sorts the final medoids and their cluster information based on cluster frequency.
- *
- * @param medoids The discovered medoids.
- * @param clusterAssignment The assignment map for each data point.
- * @param clusterSize The frequency of each cluster.
- * @return A sorted and correctly mapped Object array.
- */
- private static Object[] sortResults(long[] medoids, int[] clusterAssignment, long[] clusterSize) {
- int k = medoids.length;
- List<KClusterAlgorithm.MedoidSortHelper> sorters = new ArrayList<>();
- for (int i = 0; i < k; i++) {
- sorters.add(new KClusterAlgorithm.MedoidSortHelper(medoids[i], clusterSize[i], i));
- }
- Collections.sort(sorters);
+ // 3. Calculate final cluster sizes and sort results.
+ long[] finalClusterSizes = new long[k];
+ for (int assignment : clusterAssignment) {
+ if (assignment != -1) finalClusterSizes[assignment]++;
+ }
+ return sortResults(medoids, clusterAssignment, finalClusterSizes);
+ }
- long[] sortedMedoids = new long[k];
- long[] sortedClusterSize = new long[k];
- int[] oldToNewIndexMap = new int[k];
+ /** K-Medoids++ style initialization for 1D data. */
+ private static long[] acceleratedInitialization(long[] data, int k) {
+ long[] medoids = new long[k];
+ Set<Long> selectedMedoids = new HashSet<>();
- for (int i = 0; i < k; i++) {
- KClusterAlgorithm.MedoidSortHelper sortedItem = sorters.get(i);
- sortedMedoids[i] = sortedItem.medoid;
- sortedClusterSize[i] = sortedItem.size;
- oldToNewIndexMap[sortedItem.originalIndex] = i;
- }
+ int firstIndex = rand.nextInt(data.length);
+ medoids[0] = data[firstIndex];
+ selectedMedoids.add(medoids[0]);
- int[] sortedClusterAssignment = new int[clusterAssignment.length];
- for (int i = 0; i < clusterAssignment.length; i++) {
- int oldIndex = clusterAssignment[i];
- sortedClusterAssignment[i] = oldToNewIndexMap[oldIndex];
- }
-
- return new Object[]{sortedMedoids, sortedClusterAssignment, sortedClusterSize};
+ long[] distances = new long[data.length];
+ for (int i = 0; i < data.length; i++) {
+ distances[i] = Math.abs(data[i] - medoids[0]);
}
- // --- Cost Calculation Functions ---
-
- private static long bitLengthCost(long value) {
- if (value == 0) return 1;
- return 64 - Long.numberOfLeadingZeros(value);
- }
-
- private static long calculateResidualCost(long p1, long p2) {
- return 1 + bitLengthCost(Math.abs(p1 - p2));
- }
-
- private static int binarySearch(long[] prefixSums, long value) {
- int low = 0;
- int high = prefixSums.length - 1;
- int ans = -1;
- while (low <= high) {
- int mid = low + (high - low) / 2;
- if (prefixSums[mid] >= value) {
- ans = mid;
- high = mid - 1;
- } else {
- low = mid + 1;
- }
+ for (int i = 1; i < k; i++) {
+ long[] prefixSums = new long[data.length];
+ prefixSums[0] = distances[0];
+ for (int p = 1; p < data.length; p++) {
+ prefixSums[p] = prefixSums[p - 1] + distances[p];
+ }
+ long totalDistance = prefixSums[data.length - 1];
+ if (totalDistance == 0) {
+ int idx = rand.nextInt(data.length);
+ while (selectedMedoids.contains(data[idx])) {
+ idx = (idx + 1) % data.length;
}
- return ans == -1 ? prefixSums.length - 1 : ans;
+ medoids[i] = data[idx];
+ selectedMedoids.add(medoids[i]);
+ continue;
+ }
+
+ long randValue = (long) (rand.nextDouble() * totalDistance);
+ int chosenIdx = binarySearch(prefixSums, randValue);
+
+ while (selectedMedoids.contains(data[chosenIdx])) {
+ chosenIdx = (chosenIdx + 1) % data.length;
+ }
+ medoids[i] = data[chosenIdx];
+ selectedMedoids.add(medoids[i]);
+
+ for (int idx = 0; idx < data.length; idx++) {
+ long distNewMedoid = Math.abs(data[idx] - medoids[i]);
+ if (distNewMedoid < distances[idx]) {
+ distances[idx] = distNewMedoid;
+ }
+ }
+ }
+ return medoids;
+ }
+
+ /**
+ * Updates medoids by finding the point within each cluster that minimizes the total intra-cluster
+ * cost.
+ */
+ private static long[] updateMedoids(long[] data, int[] clusterAssignment, int k) {
+ long[] newMedoids = new long[k];
+ List<Long>[] clusterPoints = new ArrayList[k];
+ for (int i = 0; i < k; i++) {
+ clusterPoints[i] = new ArrayList<>();
+ }
+ for (int i = 0; i < data.length; i++) {
+ if (clusterAssignment[i] != -1) {
+ clusterPoints[clusterAssignment[i]].add(data[i]);
+ }
}
+ for (int m = 0; m < k; m++) {
+ List<Long> members = clusterPoints[m];
+ if (members.isEmpty()) continue;
+
+ long minTotalClusterCost = Long.MAX_VALUE;
+ long newMedoid = members.get(0);
+
+ for (Long candidate : members) {
+ long currentCandidateTotalCost = 0L;
+ for (Long otherMember : members) {
+ currentCandidateTotalCost += calculateResidualCost(candidate, otherMember);
+ }
+ if (currentCandidateTotalCost < minTotalClusterCost) {
+ minTotalClusterCost = currentCandidateTotalCost;
+ newMedoid = candidate;
+ }
+ }
+ newMedoids[m] = newMedoid;
+ }
+ return newMedoids;
+ }
+
+ /**
+ * Sorts the final medoids and their cluster information based on cluster frequency.
+ *
+ * @param medoids The discovered medoids.
+ * @param clusterAssignment The assignment map for each data point.
+ * @param clusterSize The frequency of each cluster.
+ * @return A sorted and correctly mapped Object array.
+ */
+ private static Object[] sortResults(long[] medoids, int[] clusterAssignment, long[] clusterSize) {
+ int k = medoids.length;
+ List<KClusterAlgorithm.MedoidSortHelper> sorters = new ArrayList<>();
+ for (int i = 0; i < k; i++) {
+ sorters.add(new KClusterAlgorithm.MedoidSortHelper(medoids[i], clusterSize[i], i));
+ }
+ Collections.sort(sorters);
+
+ long[] sortedMedoids = new long[k];
+ long[] sortedClusterSize = new long[k];
+ int[] oldToNewIndexMap = new int[k];
+
+ for (int i = 0; i < k; i++) {
+ KClusterAlgorithm.MedoidSortHelper sortedItem = sorters.get(i);
+ sortedMedoids[i] = sortedItem.medoid;
+ sortedClusterSize[i] = sortedItem.size;
+ oldToNewIndexMap[sortedItem.originalIndex] = i;
+ }
+
+ int[] sortedClusterAssignment = new int[clusterAssignment.length];
+ for (int i = 0; i < clusterAssignment.length; i++) {
+ int oldIndex = clusterAssignment[i];
+ sortedClusterAssignment[i] = oldToNewIndexMap[oldIndex];
+ }
+
+ return new Object[] {sortedMedoids, sortedClusterAssignment, sortedClusterSize};
+ }
+
+ // --- Cost Calculation Functions ---
+
+ private static long bitLengthCost(long value) {
+ if (value == 0) return 1;
+ return 64 - Long.numberOfLeadingZeros(value);
+ }
+
+ private static long calculateResidualCost(long p1, long p2) {
+ return 1 + bitLengthCost(Math.abs(p1 - p2));
+ }
+
+ private static int binarySearch(long[] prefixSums, long value) {
+ int low = 0;
+ int high = prefixSums.length - 1;
+ int ans = -1;
+ while (low <= high) {
+ int mid = low + (high - low) / 2;
+ if (prefixSums[mid] >= value) {
+ ans = mid;
+ high = mid - 1;
+ } else {
+ low = mid + 1;
+ }
+ }
+ return ans == -1 ? prefixSums.length - 1 : ans;
+ }
}
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/KClusterEncoder.java b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/KClusterEncoder.java
index c24e90a..c8b347b 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/KClusterEncoder.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/KClusterEncoder.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.tsfile.encoding.encoder;
import org.apache.tsfile.enums.TSDataType;
@@ -9,373 +28,417 @@
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
-public class KClusterEncoder extends Encoder{
+public class KClusterEncoder extends Encoder {
- private static final int DEFAULT_PACK_SIZE = 10;
- private final int k;
- private long[] scaledData;
- private int scalingExponent;
- private final ValueBuffer buffer;
+ private static final int DEFAULT_PACK_SIZE = 10;
+ private final int k;
+ private long[] scaledData;
+ private int scalingExponent;
+ private final ValueBuffer buffer;
- // MODIFIED: Constructor takes 'k'
- public KClusterEncoder(TSDataType dataType, int k) {
- super(TSEncoding.KCLUSTER);
- this.k = k;
- switch (dataType) {
- case INT32: case INT64:
- this.buffer = new Int64Buffer();
- break;
- case FLOAT: case DOUBLE:
- this.buffer = new DoubleBuffer();
- break;
- default:
- throw new TsFileEncodingException("AClusterEncoder does not support data type: " + dataType);
- }
+ // MODIFIED: Constructor takes 'k'
+ public KClusterEncoder(TSDataType dataType, int k) {
+ super(TSEncoding.KCLUSTER);
+ this.k = k;
+ switch (dataType) {
+ case INT32:
+ case INT64:
+ this.buffer = new Int64Buffer();
+ break;
+ case FLOAT:
+ case DOUBLE:
+ this.buffer = new DoubleBuffer();
+ break;
+ default:
+ throw new TsFileEncodingException(
+ "AClusterEncoder does not support data type: " + dataType);
}
+ }
+ /** A buffer to store all values of a page before flushing. We use long to unify all types. */
+ private interface ValueBuffer {
+ /** Adds a value to the buffer. */
+ void add(int value);
- /** A buffer to store all values of a page before flushing. We use long to unify all types. */
- private interface ValueBuffer {
- /** Adds a value to the buffer. */
- void add(int value);
- void add(long value);
- void add(float value);
- void add(double value);
+ void add(long value);
- /** Clears the internal buffer. */
- void clear();
+ void add(float value);
- /** Checks if the buffer is empty. */
- boolean isEmpty();
+ void add(double value);
- /** Gets the current size of the buffer. */
- int size();
+ /** Clears the internal buffer. */
+ void clear();
- KClusterEncoder.ProcessingResult processAndGet();
- }
+ /** Checks if the buffer is empty. */
+ boolean isEmpty();
- private static class Int64Buffer implements KClusterEncoder.ValueBuffer {
- private final List<Long> values = new ArrayList<>();
+ /** Gets the current size of the buffer. */
+ int size();
- @Override
- public void add(int value) { values.add((long) value); }
- @Override
- public void add(long value) { values.add(value); }
- @Override
- public void add(float value) { /* Do nothing, type mismatch */ }
- @Override
- public void add(double value) { /* Do nothing, type mismatch */ }
+ KClusterEncoder.ProcessingResult processAndGet();
+ }
- @Override
- public KClusterEncoder.ProcessingResult processAndGet() { // <--- 实现新方法
- long[] data = values.stream().mapToLong(l -> l).toArray();
- return new KClusterEncoder.ProcessingResult(data, 0); // Exponent is 0 for integers
- }
-
- @Override
- public void clear() { values.clear(); }
- @Override
- public boolean isEmpty() { return values.isEmpty(); }
- @Override
- public int size() { return values.size(); }
- }
-
- /** A buffer for FLOAT and DOUBLE types. It performs scaling in processAndGetLongs(). */
- private static class DoubleBuffer implements KClusterEncoder.ValueBuffer {
- private final List<Double> values = new ArrayList<>();
-
- @Override
- public void add(int value) { /* Do nothing, type mismatch */ }
- @Override
- public void add(long value) { /* Do nothing, type mismatch */ }
- @Override
- public void add(float value) { values.add((double) value); } // Store as double to unify
- @Override
- public void add(double value) { values.add(value); }
-
- @Override
- public KClusterEncoder.ProcessingResult processAndGet() {
- // --- Edge Case: Handle empty buffer ---
- if (values.isEmpty()) {
- return new KClusterEncoder.ProcessingResult(new long[0], 0);
- }
- int maxDecimalPlaces = 0;
- for (double v : values) {
- String s = BigDecimal.valueOf(v).toPlainString();
- int dotIndex = s.indexOf('.');
- if (dotIndex != -1) {
- int decimalPlaces = s.length() - dotIndex - 1;
- if (decimalPlaces > maxDecimalPlaces) {
- maxDecimalPlaces = decimalPlaces;
- }
- }
- }
-
- double scalingFactor = Math.pow(10, maxDecimalPlaces);
-
- long[] scaledLongs = new long[values.size()];
- for (int i = 0; i < values.size(); i++) {
- scaledLongs[i] = Math.round(values.get(i) * scalingFactor);
- }
-
- return new KClusterEncoder.ProcessingResult(scaledLongs, maxDecimalPlaces);
- }
-
- @Override
- public void clear() { values.clear(); }
- @Override
- public boolean isEmpty() { return values.isEmpty(); }
- @Override
- public int size() { return values.size(); }
- }
-
- private static class ProcessingResult {
- final long[] scaledLongs;
- final int scalingExponent; // e.g., 3 for a scaling factor of 1000
-
- ProcessingResult(long[] scaledLongs, int scalingExponent) {
- this.scaledLongs = scaledLongs;
- this.scalingExponent = scalingExponent;
- }
-
- long[] getScaledLongs(){
- return this.scaledLongs;
- }
-
- int getScalingExponent(){
- return this.scalingExponent;
- }
- }
-
+ private static class Int64Buffer implements KClusterEncoder.ValueBuffer {
+ private final List<Long> values = new ArrayList<>();
@Override
- public void flush(ByteArrayOutputStream out) throws IOException {
- if (buffer.isEmpty()) {
- return;
- }
-
- ProcessingResult procResult = buffer.processAndGet();
- long[] originalData = procResult.getScaledLongs();
- int scalingExponent = procResult.getScalingExponent();
- if (originalData.length == 0) return;
- long minVal = findMin(originalData);
- long[] data = new long[originalData.length];
- for (int i = 0; i < data.length; i++) {
- data[i] = originalData[i] - minVal;
- }
-
- Object[] clusterResult = KClusterAlgorithm.run(data,k);
- long[] sortedMedoids = (long[]) clusterResult[0];
- int[] clusterAssignments = (int[]) clusterResult[1];
- long[] clusterFrequencies = (long[]) clusterResult[2];
-
- long[] sortedZigzagResiduals = calculateSortedZigzagResiduals(data, sortedMedoids, clusterAssignments, clusterFrequencies);
-
- encodeResults(out, scalingExponent, minVal, sortedMedoids, clusterAssignments, sortedZigzagResiduals);
-
- buffer.clear();
- }
-
-
- @Override
- public void encode(int value, ByteArrayOutputStream out) {
- buffer.add(value);
+ public void add(int value) {
+ values.add((long) value);
}
@Override
- public void encode(long value, ByteArrayOutputStream out) {
- buffer.add(value);
+ public void add(long value) {
+ values.add(value);
}
@Override
- public void encode(float value, ByteArrayOutputStream out) {
- buffer.add(value);
+ public void add(float value) {
+ /* Do nothing, type mismatch */
}
@Override
- public void encode(double value, ByteArrayOutputStream out) {
- buffer.add(value);
- }
-
- private long[] calculateSortedZigzagResiduals(long[] data, long[] medoids, int[] assignments, long[] frequencies) {
- int n = data.length;
- int k = medoids.length;
- if (n == 0) return new long[0];
-
- long[] sortedResiduals = new long[n];
- int[] writePointers = new int[k];
- int cumulativeCount = 0;
- for (int i = 0; i < k; i++) {
- writePointers[i] = cumulativeCount;
- cumulativeCount += (int) frequencies[i];
- }
-
- for (int i = 0; i < n; i++) {
- int clusterId = assignments[i];
- long medoid = medoids[clusterId];
- long residual = data[i] - medoid;
- long zigzagResidual = (residual << 1) ^ (residual >> 63); // Zigzag Encoding
-
- int targetIndex = writePointers[clusterId];
- sortedResiduals[targetIndex] = zigzagResidual;
- writePointers[clusterId]++;
- }
- return sortedResiduals;
- }
-
- private void encodeResults(
- ByteArrayOutputStream out,
- int scalingExponent,
- long minVal,
- long[] medoids,
- int[] frequencies,
- long[] residuals)
- throws IOException {
-
- ClusterSupport writer = new ClusterSupport(out);
- int numPoints = frequencies.length;
- int k = medoids.length;
-
- writer.write(scalingExponent, 8);
- writer.write(k, 16);
- writer.write(numPoints, 16);
- writer.write(DEFAULT_PACK_SIZE, 16);
-
- int minValBit = ClusterSupport.bitsRequired(minVal);
- writer.write(minValBit, 8);
- writer.write(minVal>=0?0:1,1);
- writer.write(minVal, minValBit);
-
- if (k == 0) {
- writer.flush();
- return;
- }
-
- long minMedoid = findMin(medoids);
- long[] medoidOffsets = new long[k];
- int maxMedoidOffsetBits = 0;
- for (int i = 0; i < k; i++) {
- medoidOffsets[i] = medoids[i] - minMedoid;
- maxMedoidOffsetBits = Math.max(maxMedoidOffsetBits, ClusterSupport.bitsRequired(medoidOffsets[i]));
- }
- int minMedoidBit = ClusterSupport.bitsRequired(minMedoid);
- writer.write(minMedoidBit,8);
- writer.write(minMedoid>0?0:1,1);
- writer.write(minMedoid, minMedoidBit);
-
- writer.write(maxMedoidOffsetBits, 8);
- for (long offset : medoidOffsets) {
- writer.write(offset, maxMedoidOffsetBits);
- }
-
- long[] freqDeltas = new long[k];
- if (k > 0) {
- freqDeltas[0] = frequencies[0];
- for (int i = 1; i < k; i++) {
- freqDeltas[i] = frequencies[i] - frequencies[i - 1];
- }
- }
- int numFreqBlocks = (k + DEFAULT_PACK_SIZE - 1) / DEFAULT_PACK_SIZE;
- writer.write(numFreqBlocks, 16);
-
- int[] freqBlockMaxBits = new int[numFreqBlocks];
- // Metadata pass for frequencies
- for (int i = 0; i < numFreqBlocks; i++) {
- int start = i * DEFAULT_PACK_SIZE;
- int end = Math.min(start + DEFAULT_PACK_SIZE, k);
- long maxDelta = 0;
- for (int j = start; j < end; j++) {
- maxDelta = Math.max(maxDelta, freqDeltas[j]);
- }
- freqBlockMaxBits[i] = ClusterSupport.bitsRequired(maxDelta);
- writer.write(freqBlockMaxBits[i], 8);
- }
- // Data pass for frequencies
- for (int i = 0; i < numFreqBlocks; i++) {
- int start = i * DEFAULT_PACK_SIZE;
- int end = Math.min(start + DEFAULT_PACK_SIZE, k);
- int bitsForBlock = freqBlockMaxBits[i];
- for (int j = start; j < end; j++) {
- writer.write(freqDeltas[j], bitsForBlock);
- }
- }
-
- int numPacks = (numPoints + DEFAULT_PACK_SIZE - 1) / DEFAULT_PACK_SIZE;
- writer.write(numPacks, 32);
-
- int[] resPackMaxBits = new int[numPacks];
- // Metadata pass for residuals
- for (int i = 0; i < numPacks; i++) {
- int start = i * DEFAULT_PACK_SIZE;
- int end = Math.min(start + DEFAULT_PACK_SIZE, numPoints);
- long maxOffset = 0;
- for (int j = start; j < end; j++) {
- maxOffset = Math.max(maxOffset, residuals[j]);
- }
- resPackMaxBits[i] = ClusterSupport.bitsRequired(maxOffset);
- writer.write(resPackMaxBits[i], 8);
- }
- // Data pass for residuals
- for (int i = 0; i < numPacks; i++) {
- int start = i * DEFAULT_PACK_SIZE;
- int end = Math.min(start + DEFAULT_PACK_SIZE, numPoints);
- int bitsForPack = resPackMaxBits[i];
- if (bitsForPack > 0) {
- for (int j = start; j < end; j++) {
- writer.write(residuals[j], bitsForPack);
- }
- }
- }
-
- writer.flush();
- }
-
- private long findMin(long[] data) {
- if (data == null || data.length == 0) {
- throw new IllegalArgumentException("Data array cannot be null or empty.");
- }
- long min = data[0];
- for (int i = 1; i < data.length; i++) {
- if (data[i] < min) {
- min = data[i];
- }
- }
- return min;
+ public void add(double value) {
+ /* Do nothing, type mismatch */
}
@Override
- public int getOneItemMaxSize() {
- return 8;
+ public KClusterEncoder.ProcessingResult processAndGet() { // <--- 实现新方法
+ long[] data = values.stream().mapToLong(l -> l).toArray();
+ return new KClusterEncoder.ProcessingResult(data, 0); // Exponent is 0 for integers
}
@Override
- public long getMaxByteSize() {
- if (this.buffer.isEmpty()) {
- return 0;
+ public void clear() {
+ values.clear();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return values.isEmpty();
+ }
+
+ @Override
+ public int size() {
+ return values.size();
+ }
+ }
+
+ /** A buffer for FLOAT and DOUBLE types. It performs scaling in processAndGetLongs(). */
+ private static class DoubleBuffer implements KClusterEncoder.ValueBuffer {
+ private final List<Double> values = new ArrayList<>();
+
+ @Override
+ public void add(int value) {
+ /* Do nothing, type mismatch */
+ }
+
+ @Override
+ public void add(long value) {
+ /* Do nothing, type mismatch */
+ }
+
+ @Override
+ public void add(float value) {
+ values.add((double) value);
+ } // Store as double to unify
+
+ @Override
+ public void add(double value) {
+ values.add(value);
+ }
+
+ @Override
+ public KClusterEncoder.ProcessingResult processAndGet() {
+ // --- Edge Case: Handle empty buffer ---
+ if (values.isEmpty()) {
+ return new KClusterEncoder.ProcessingResult(new long[0], 0);
+ }
+ int maxDecimalPlaces = 0;
+ for (double v : values) {
+ String s = BigDecimal.valueOf(v).toPlainString();
+ int dotIndex = s.indexOf('.');
+ if (dotIndex != -1) {
+ int decimalPlaces = s.length() - dotIndex - 1;
+ if (decimalPlaces > maxDecimalPlaces) {
+ maxDecimalPlaces = decimalPlaces;
+ }
}
- return (long) this.buffer.size() * getOneItemMaxSize() * 3 / 2;
+ }
+
+ double scalingFactor = Math.pow(10, maxDecimalPlaces);
+
+ long[] scaledLongs = new long[values.size()];
+ for (int i = 0; i < values.size(); i++) {
+ scaledLongs[i] = Math.round(values.get(i) * scalingFactor);
+ }
+
+ return new KClusterEncoder.ProcessingResult(scaledLongs, maxDecimalPlaces);
}
@Override
- public void encode(boolean value, ByteArrayOutputStream out) {
- throw new TsFileEncodingException("AClusterEncoder does not support boolean values.");
+ public void clear() {
+ values.clear();
}
@Override
- public void encode(short value, ByteArrayOutputStream out) {
- throw new TsFileEncodingException("AClusterEncoder does not support short values.");
+ public boolean isEmpty() {
+ return values.isEmpty();
}
@Override
- public void encode(Binary value, ByteArrayOutputStream out) {
- throw new TsFileEncodingException("AClusterEncoder does not support Binary values.");
+ public int size() {
+ return values.size();
+ }
+ }
+
+ private static class ProcessingResult {
+ final long[] scaledLongs;
+ final int scalingExponent; // e.g., 3 for a scaling factor of 1000
+
+ ProcessingResult(long[] scaledLongs, int scalingExponent) {
+ this.scaledLongs = scaledLongs;
+ this.scalingExponent = scalingExponent;
}
- @Override
- public void encode(BigDecimal value, ByteArrayOutputStream out) {
- throw new TsFileEncodingException("AClusterEncoder does not support BigDecimal values.");
+ long[] getScaledLongs() {
+ return this.scaledLongs;
}
+
+ int getScalingExponent() {
+ return this.scalingExponent;
+ }
+ }
+
+ @Override
+ public void flush(ByteArrayOutputStream out) throws IOException {
+ if (buffer.isEmpty()) {
+ return;
+ }
+
+ ProcessingResult procResult = buffer.processAndGet();
+ long[] originalData = procResult.getScaledLongs();
+ int scalingExponent = procResult.getScalingExponent();
+ if (originalData.length == 0) return;
+ long minVal = findMin(originalData);
+ long[] data = new long[originalData.length];
+ for (int i = 0; i < data.length; i++) {
+ data[i] = originalData[i] - minVal;
+ }
+
+ Object[] clusterResult = KClusterAlgorithm.run(data, k);
+ long[] sortedMedoids = (long[]) clusterResult[0];
+ int[] clusterAssignments = (int[]) clusterResult[1];
+ long[] clusterFrequencies = (long[]) clusterResult[2];
+
+ long[] sortedZigzagResiduals =
+ calculateSortedZigzagResiduals(data, sortedMedoids, clusterAssignments, clusterFrequencies);
+
+ encodeResults(
+ out, scalingExponent, minVal, sortedMedoids, clusterAssignments, sortedZigzagResiduals);
+
+ buffer.clear();
+ }
+
+ @Override
+ public void encode(int value, ByteArrayOutputStream out) {
+ buffer.add(value);
+ }
+
+ @Override
+ public void encode(long value, ByteArrayOutputStream out) {
+ buffer.add(value);
+ }
+
+ @Override
+ public void encode(float value, ByteArrayOutputStream out) {
+ buffer.add(value);
+ }
+
+ @Override
+ public void encode(double value, ByteArrayOutputStream out) {
+ buffer.add(value);
+ }
+
+ private long[] calculateSortedZigzagResiduals(
+ long[] data, long[] medoids, int[] assignments, long[] frequencies) {
+ int n = data.length;
+ int k = medoids.length;
+ if (n == 0) return new long[0];
+
+ long[] sortedResiduals = new long[n];
+ int[] writePointers = new int[k];
+ int cumulativeCount = 0;
+ for (int i = 0; i < k; i++) {
+ writePointers[i] = cumulativeCount;
+ cumulativeCount += (int) frequencies[i];
+ }
+
+ for (int i = 0; i < n; i++) {
+ int clusterId = assignments[i];
+ long medoid = medoids[clusterId];
+ long residual = data[i] - medoid;
+ long zigzagResidual = (residual << 1) ^ (residual >> 63); // Zigzag Encoding
+
+ int targetIndex = writePointers[clusterId];
+ sortedResiduals[targetIndex] = zigzagResidual;
+ writePointers[clusterId]++;
+ }
+ return sortedResiduals;
+ }
+
+ private void encodeResults(
+ ByteArrayOutputStream out,
+ int scalingExponent,
+ long minVal,
+ long[] medoids,
+ int[] frequencies,
+ long[] residuals)
+ throws IOException {
+
+ ClusterSupport writer = new ClusterSupport(out);
+ int numPoints = frequencies.length;
+ int k = medoids.length;
+
+ writer.write(scalingExponent, 8);
+ writer.write(k, 16);
+ writer.write(numPoints, 16);
+ writer.write(DEFAULT_PACK_SIZE, 16);
+
+ int minValBit = ClusterSupport.bitsRequired(minVal);
+ writer.write(minValBit, 8);
+ writer.write(minVal >= 0 ? 0 : 1, 1);
+ writer.write(minVal, minValBit);
+
+ if (k == 0) {
+ writer.flush();
+ return;
+ }
+
+ long minMedoid = findMin(medoids);
+ long[] medoidOffsets = new long[k];
+ int maxMedoidOffsetBits = 0;
+ for (int i = 0; i < k; i++) {
+ medoidOffsets[i] = medoids[i] - minMedoid;
+ maxMedoidOffsetBits =
+ Math.max(maxMedoidOffsetBits, ClusterSupport.bitsRequired(medoidOffsets[i]));
+ }
+ int minMedoidBit = ClusterSupport.bitsRequired(minMedoid);
+ writer.write(minMedoidBit, 8);
+ writer.write(minMedoid > 0 ? 0 : 1, 1);
+ writer.write(minMedoid, minMedoidBit);
+
+ writer.write(maxMedoidOffsetBits, 8);
+ for (long offset : medoidOffsets) {
+ writer.write(offset, maxMedoidOffsetBits);
+ }
+
+ long[] freqDeltas = new long[k];
+ if (k > 0) {
+ freqDeltas[0] = frequencies[0];
+ for (int i = 1; i < k; i++) {
+ freqDeltas[i] = frequencies[i] - frequencies[i - 1];
+ }
+ }
+ int numFreqBlocks = (k + DEFAULT_PACK_SIZE - 1) / DEFAULT_PACK_SIZE;
+ writer.write(numFreqBlocks, 16);
+
+ int[] freqBlockMaxBits = new int[numFreqBlocks];
+ // Metadata pass for frequencies
+ for (int i = 0; i < numFreqBlocks; i++) {
+ int start = i * DEFAULT_PACK_SIZE;
+ int end = Math.min(start + DEFAULT_PACK_SIZE, k);
+ long maxDelta = 0;
+ for (int j = start; j < end; j++) {
+ maxDelta = Math.max(maxDelta, freqDeltas[j]);
+ }
+ freqBlockMaxBits[i] = ClusterSupport.bitsRequired(maxDelta);
+ writer.write(freqBlockMaxBits[i], 8);
+ }
+ // Data pass for frequencies
+ for (int i = 0; i < numFreqBlocks; i++) {
+ int start = i * DEFAULT_PACK_SIZE;
+ int end = Math.min(start + DEFAULT_PACK_SIZE, k);
+ int bitsForBlock = freqBlockMaxBits[i];
+ for (int j = start; j < end; j++) {
+ writer.write(freqDeltas[j], bitsForBlock);
+ }
+ }
+
+ int numPacks = (numPoints + DEFAULT_PACK_SIZE - 1) / DEFAULT_PACK_SIZE;
+ writer.write(numPacks, 32);
+
+ int[] resPackMaxBits = new int[numPacks];
+ // Metadata pass for residuals
+ for (int i = 0; i < numPacks; i++) {
+ int start = i * DEFAULT_PACK_SIZE;
+ int end = Math.min(start + DEFAULT_PACK_SIZE, numPoints);
+ long maxOffset = 0;
+ for (int j = start; j < end; j++) {
+ maxOffset = Math.max(maxOffset, residuals[j]);
+ }
+ resPackMaxBits[i] = ClusterSupport.bitsRequired(maxOffset);
+ writer.write(resPackMaxBits[i], 8);
+ }
+ // Data pass for residuals
+ for (int i = 0; i < numPacks; i++) {
+ int start = i * DEFAULT_PACK_SIZE;
+ int end = Math.min(start + DEFAULT_PACK_SIZE, numPoints);
+ int bitsForPack = resPackMaxBits[i];
+ if (bitsForPack > 0) {
+ for (int j = start; j < end; j++) {
+ writer.write(residuals[j], bitsForPack);
+ }
+ }
+ }
+
+ writer.flush();
+ }
+
+ private long findMin(long[] data) {
+ if (data == null || data.length == 0) {
+ throw new IllegalArgumentException("Data array cannot be null or empty.");
+ }
+ long min = data[0];
+ for (int i = 1; i < data.length; i++) {
+ if (data[i] < min) {
+ min = data[i];
+ }
+ }
+ return min;
+ }
+
+ @Override
+ public int getOneItemMaxSize() {
+ return 8;
+ }
+
+ @Override
+ public long getMaxByteSize() {
+ if (this.buffer.isEmpty()) {
+ return 0;
+ }
+ return (long) this.buffer.size() * getOneItemMaxSize() * 3 / 2;
+ }
+
+ @Override
+ public void encode(boolean value, ByteArrayOutputStream out) {
+ throw new TsFileEncodingException("AClusterEncoder does not support boolean values.");
+ }
+
+ @Override
+ public void encode(short value, ByteArrayOutputStream out) {
+ throw new TsFileEncodingException("AClusterEncoder does not support short values.");
+ }
+
+ @Override
+ public void encode(Binary value, ByteArrayOutputStream out) {
+ throw new TsFileEncodingException("AClusterEncoder does not support Binary values.");
+ }
+
+ @Override
+ public void encode(BigDecimal value, ByteArrayOutputStream out) {
+ throw new TsFileEncodingException("AClusterEncoder does not support BigDecimal values.");
+ }
}
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/TSEncodingBuilder.java b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/TSEncodingBuilder.java
index 4a25c88..5bcb401 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/TSEncodingBuilder.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/TSEncodingBuilder.java
@@ -214,9 +214,7 @@
}
@Override
- public void initFromProps(Map<String, String> props) {
-
- }
+ public void initFromProps(Map<String, String> props) {}
}
public static class KCluster extends TSEncodingBuilder {
@@ -255,12 +253,12 @@
int parsedK = Integer.parseInt(kStr);
if (parsedK <= 0) {
throw new IllegalArgumentException(
- "KCLUSTER parameter k must be a positive integer, but was " + parsedK);
+ "KCLUSTER parameter k must be a positive integer, but was " + parsedK);
}
this.k = parsedK;
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
- "KCLUSTER parameter k must be an integer, but was " + kStr);
+ "KCLUSTER parameter k must be an integer, but was " + kStr);
}
}
}
diff --git a/java/tsfile/src/test/java/org/apache/tsfile/encoding/decoder/AClusterEncoderDecoderTest.java b/java/tsfile/src/test/java/org/apache/tsfile/encoding/decoder/AClusterEncoderDecoderTest.java
index fcc4dca..e1b6ea1 100644
--- a/java/tsfile/src/test/java/org/apache/tsfile/encoding/decoder/AClusterEncoderDecoderTest.java
+++ b/java/tsfile/src/test/java/org/apache/tsfile/encoding/decoder/AClusterEncoderDecoderTest.java
@@ -1,8 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.tsfile.encoding.decoder;
import org.apache.tsfile.encoding.encoder.AClusterEncoder;
import org.apache.tsfile.encoding.encoder.Encoder;
import org.apache.tsfile.enums.TSDataType;
+
import org.junit.Test;
import java.io.ByteArrayOutputStream;
@@ -10,7 +30,11 @@
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -18,193 +42,195 @@
/**
* Test suite for AClusterEncoder and AClusterDecoder.
- * <p>
- * This test validates the end-to-end encoding and decoding process.
- * Since ACluster algorithm reorders data based on clusters, the validation
- * cannot compare original and decoded lists element by element. Instead, it
- * verifies that the set of unique values and their frequencies are identical
- * before and after the process.
- * </p>
- * <p>
- * The test structure is adapted to the iterator-style Decoder interface
- * (hasNext(buffer), readXXX(buffer)).
- * </p>
+ *
+ * <p>This test validates the end-to-end encoding and decoding process. Since ACluster algorithm
+ * reorders data based on clusters, the validation cannot compare original and decoded lists element
+ * by element. Instead, it verifies that the set of unique values and their frequencies are
+ * identical before and after the process.
+ *
+ * <p>The test structure is adapted to the iterator-style Decoder interface (hasNext(buffer),
+ * readXXX(buffer)).
*/
public class AClusterEncoderDecoderTest {
- private static final int ROW_NUM = 1000;
- private final Random ran = new Random();
+ private static final int ROW_NUM = 1000;
+ private final Random ran = new Random();
- // =================================================================================
- // Integer Tests
- // =================================================================================
+ // =================================================================================
+ // Integer Tests
+ // =================================================================================
- @Test
- public void testIntBasicClusters() throws IOException {
- List<Integer> data = new ArrayList<>();
- // Three distinct clusters
- for (int i = 0; i < 300; i++) data.add(100 + ran.nextInt(10)); // Cluster around 100
- for (int i = 0; i < 400; i++) data.add(5000 + ran.nextInt(20)); // Cluster around 5000
- for (int i = 0; i < 300; i++) data.add(100000 + ran.nextInt(5)); // Cluster around 100000
- shouldReadAndWrite(data, TSDataType.INT32);
+ @Test
+ public void testIntBasicClusters() throws IOException {
+ List<Integer> data = new ArrayList<>();
+ // Three distinct clusters
+ for (int i = 0; i < 300; i++) data.add(100 + ran.nextInt(10)); // Cluster around 100
+ for (int i = 0; i < 400; i++) data.add(5000 + ran.nextInt(20)); // Cluster around 5000
+ for (int i = 0; i < 300; i++) data.add(100000 + ran.nextInt(5)); // Cluster around 100000
+ shouldReadAndWrite(data, TSDataType.INT32);
+ }
+
+ // =================================================================================
+ // Long Tests
+ // =================================================================================
+
+ @Test
+ public void testLongBasic() throws IOException {
+ List<Long> data = new ArrayList<>();
+ for (int i = 0; i < ROW_NUM; i++) {
+ data.add((long) i * i * i);
+ }
+ shouldReadAndWrite(data, TSDataType.INT64);
+ }
+
+ // =================================================================================
+ // Double Tests
+ // =================================================================================
+
+ @Test
+ public void testDoubleWithPrecision() throws IOException {
+ List<Double> data = new ArrayList<>();
+ final int precision = 6;
+
+ System.out.println(
+ "Testing double with controlled precision (max " + precision + " decimal places)...");
+
+ for (int i = 0; i < ROW_NUM / 2; i++) {
+ double randomPart = nextRandomDoubleWithPrecision(ran, precision);
+ double rawValue = 123.456 + randomPart;
+
+ double cleanValue = cleanDouble(rawValue, precision + 3);
+ data.add(cleanValue);
}
+ for (int i = 0; i < ROW_NUM / 2; i++) {
+ double randomPart = nextRandomDoubleWithPrecision(ran, precision);
+ double rawValue = 9999.0 + randomPart;
- // =================================================================================
- // Long Tests
- // =================================================================================
-
- @Test
- public void testLongBasic() throws IOException {
- List<Long> data = new ArrayList<>();
- for (int i = 0; i < ROW_NUM; i++) {
- data.add((long) i * i * i);
- }
- shouldReadAndWrite(data, TSDataType.INT64);
+ double cleanValue = cleanDouble(rawValue, precision);
+ data.add(cleanValue);
}
- // =================================================================================
- // Double Tests
- // =================================================================================
-
- @Test
- public void testDoubleWithPrecision() throws IOException {
- List<Double> data = new ArrayList<>();
- final int precision = 6;
-
- System.out.println("Testing double with controlled precision (max " + precision + " decimal places)...");
-
- for (int i = 0; i < ROW_NUM / 2; i++) {
- double randomPart = nextRandomDoubleWithPrecision(ran, precision);
- double rawValue = 123.456 + randomPart;
-
- double cleanValue = cleanDouble(rawValue, precision + 3);
- data.add(cleanValue);
- }
-
- for (int i = 0; i < ROW_NUM / 2; i++) {
- double randomPart = nextRandomDoubleWithPrecision(ran, precision);
- double rawValue = 9999.0 + randomPart;
-
- double cleanValue = cleanDouble(rawValue, precision);
- data.add(cleanValue);
- }
-
- if (!data.isEmpty()) {
- System.out.println("Sample generated data point (after cleaning): " + data.get(0));
- }
-
- shouldReadAndWrite(data, TSDataType.DOUBLE);
+ if (!data.isEmpty()) {
+ System.out.println("Sample generated data point (after cleaning): " + data.get(0));
}
- private double cleanDouble(double value, int maxPrecision) {
- BigDecimal bd = new BigDecimal(value);
- BigDecimal roundedBd = bd.setScale(maxPrecision, RoundingMode.HALF_UP);
- return roundedBd.doubleValue();
- }
- // =================================================================================
- // Edge Case Tests
- // =================================================================================
+ shouldReadAndWrite(data, TSDataType.DOUBLE);
+ }
- @Test
- public void testSingleValue() throws IOException {
- shouldReadAndWrite(Arrays.asList(123.00000001,123.00000002, 29.0001,29.0002,29.000001), TSDataType.DOUBLE);
+ private double cleanDouble(double value, int maxPrecision) {
+ BigDecimal bd = new BigDecimal(value);
+ BigDecimal roundedBd = bd.setScale(maxPrecision, RoundingMode.HALF_UP);
+ return roundedBd.doubleValue();
+ }
+
+ // =================================================================================
+ // Edge Case Tests
+ // =================================================================================
+
+ @Test
+ public void testSingleValue() throws IOException {
+ shouldReadAndWrite(
+ Arrays.asList(123.00000001, 123.00000002, 29.0001, 29.0002, 29.000001), TSDataType.DOUBLE);
+ }
+
+ @Test
+ public void testAllSameValues() throws IOException {
+ List<Integer> data = new ArrayList<>();
+ for (int i = 0; i < 100; i++) data.add(777);
+ shouldReadAndWrite(data, TSDataType.INT32);
+ }
+
+ // =================================================================================
+ // Core Test Logic and Helpers
+ // =================================================================================
+
+ private double nextRandomDoubleWithPrecision(Random random, int precision) {
+ if (precision < 0) {
+ throw new IllegalArgumentException("Precision must be non-negative.");
+ }
+ double factor = Math.pow(10, precision);
+
+ double scaled = random.nextDouble() * factor;
+ long rounded = Math.round(scaled);
+ return rounded / factor;
+ }
+
+ /** Generic helper to write a list of data using the appropriate encoder method. */
+ private <T extends Number> void writeData(
+ List<T> data, Encoder encoder, ByteArrayOutputStream out) throws IOException {
+ if (data.isEmpty()) {
+ return;
+ }
+ // Use instanceof to call the correct overloaded encode method
+ if (data.get(0) instanceof Integer) {
+ data.forEach(val -> encoder.encode((Integer) val, out));
+ } else if (data.get(0) instanceof Long) {
+ data.forEach(val -> encoder.encode((Long) val, out));
+ } else if (data.get(0) instanceof Float) {
+ data.forEach(val -> encoder.encode((Float) val, out));
+ } else if (data.get(0) instanceof Double) {
+ data.forEach(val -> encoder.encode((Double) val, out));
+ }
+ encoder.flush(out);
+ }
+
+ /**
+ * The main validation method. It encodes the given data, then decodes it, and finally compares
+ * the frequency maps of the original and decoded data.
+ */
+ private <T extends Number> void shouldReadAndWrite(List<T> originalData, TSDataType dataType)
+ throws IOException {
+ // 1. Prepare for encoding
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ Encoder encoder = new AClusterEncoder(dataType);
+
+ // 2. Encode the data
+ writeData(originalData, encoder, out);
+ ByteBuffer buffer = ByteBuffer.wrap(out.toByteArray());
+
+ // 3. Decode the data using the iterator-style interface
+ Decoder decoder = new ClusterDecoder(dataType);
+ List<T> decodedData = new ArrayList<>();
+
+ while (decoder.hasNext(buffer)) {
+ switch (dataType) {
+ case INT32:
+ decodedData.add((T) Integer.valueOf(decoder.readInt(buffer)));
+ break;
+ case INT64:
+ decodedData.add((T) Long.valueOf(decoder.readLong(buffer)));
+ break;
+ case FLOAT:
+ decodedData.add((T) Float.valueOf(decoder.readFloat(buffer)));
+ break;
+ case DOUBLE:
+ decodedData.add((T) Double.valueOf(decoder.readDouble(buffer)));
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported data type for test");
+ }
}
- @Test
- public void testAllSameValues() throws IOException {
- List<Integer> data = new ArrayList<>();
- for(int i = 0; i < 100; i++) data.add(777);
- shouldReadAndWrite(data, TSDataType.INT32);
- }
+ // 4. Validate the results
+ // First, a quick check on the total count
+ assertEquals(
+ "Decoded data size should match original data size",
+ originalData.size(),
+ decodedData.size());
- // =================================================================================
- // Core Test Logic and Helpers
- // =================================================================================
+ // Second, the robust check using frequency maps
+ Map<T, Long> originalFrequencies = getFrequencyMap(originalData);
+ Map<T, Long> decodedFrequencies = getFrequencyMap(decodedData);
- private double nextRandomDoubleWithPrecision(Random random, int precision) {
- if (precision < 0) {
- throw new IllegalArgumentException("Precision must be non-negative.");
- }
- double factor = Math.pow(10, precision);
+ assertEquals(
+ "Frequency maps of original and decoded data should be identical",
+ originalFrequencies,
+ decodedFrequencies);
+ }
- double scaled = random.nextDouble() * factor;
- long rounded = Math.round(scaled);
- return rounded / factor;
- }
-
- /**
- * Generic helper to write a list of data using the appropriate encoder method.
- */
- private <T extends Number> void writeData(List<T> data, Encoder encoder, ByteArrayOutputStream out) throws IOException {
- if (data.isEmpty()) {
- return;
- }
- // Use instanceof to call the correct overloaded encode method
- if (data.get(0) instanceof Integer) {
- data.forEach(val -> encoder.encode((Integer) val, out));
- } else if (data.get(0) instanceof Long) {
- data.forEach(val -> encoder.encode((Long) val, out));
- } else if (data.get(0) instanceof Float) {
- data.forEach(val -> encoder.encode((Float) val, out));
- } else if (data.get(0) instanceof Double) {
- data.forEach(val -> encoder.encode((Double) val, out));
- }
- encoder.flush(out);
- }
-
- /**
- * The main validation method. It encodes the given data, then decodes it,
- * and finally compares the frequency maps of the original and decoded data.
- */
- private <T extends Number> void shouldReadAndWrite(List<T> originalData, TSDataType dataType) throws IOException {
- // 1. Prepare for encoding
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- Encoder encoder = new AClusterEncoder(dataType);
-
- // 2. Encode the data
- writeData(originalData, encoder, out);
- ByteBuffer buffer = ByteBuffer.wrap(out.toByteArray());
-
- // 3. Decode the data using the iterator-style interface
- Decoder decoder = new ClusterDecoder(dataType);
- List<T> decodedData = new ArrayList<>();
-
- while (decoder.hasNext(buffer)) {
- switch (dataType) {
- case INT32:
- decodedData.add((T) Integer.valueOf(decoder.readInt(buffer)));
- break;
- case INT64:
- decodedData.add((T) Long.valueOf(decoder.readLong(buffer)));
- break;
- case FLOAT:
- decodedData.add((T) Float.valueOf(decoder.readFloat(buffer)));
- break;
- case DOUBLE:
- decodedData.add((T) Double.valueOf(decoder.readDouble(buffer)));
- break;
- default:
- throw new UnsupportedOperationException("Unsupported data type for test");
- }
- }
-
- // 4. Validate the results
- // First, a quick check on the total count
- assertEquals("Decoded data size should match original data size", originalData.size(), decodedData.size());
-
- // Second, the robust check using frequency maps
- Map<T, Long> originalFrequencies = getFrequencyMap(originalData);
- Map<T, Long> decodedFrequencies = getFrequencyMap(decodedData);
-
- assertEquals("Frequency maps of original and decoded data should be identical", originalFrequencies, decodedFrequencies);
- }
-
- /**
- * Helper method to count frequencies of elements in a list.
- */
- private <T extends Number> Map<T, Long> getFrequencyMap(List<T> list) {
- return list.stream()
- .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
- }
-}
\ No newline at end of file
+ /** Helper method to count frequencies of elements in a list. */
+ private <T extends Number> Map<T, Long> getFrequencyMap(List<T> list) {
+ return list.stream().collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
+ }
+}
diff --git a/pom.xml b/pom.xml
index 1ee0d80..9aa7a33 100644
--- a/pom.xml
+++ b/pom.xml
@@ -323,6 +323,16 @@
<excludeCoordinates>
<!-- TODO: For this CVE no fix exists yet (Keep an eye on it) -->
<exclude>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.15.0</version>
+ </exclude>
+ <exclude>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>2.10.1</version>
+ </exclude>
+ <exclude>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>4.1.97.Final</version>