Research/cluster compress (#614)

diff --git a/cpp/src/common/db_common.h b/cpp/src/common/db_common.h
index 8f87e5d..9c8ec0d 100644
--- a/cpp/src/common/db_common.h
+++ b/cpp/src/common/db_common.h
@@ -52,6 +52,8 @@
     GORILLA = 8,
     ZIGZAG = 9,
     FREQ = 10,
+    KCLUSTER = 11,
+    ACLUSTER = 12,
     INVALID_ENCODING = 255
 };
 
@@ -68,7 +70,7 @@
 };
 
 extern const char* s_data_type_names[8];
-extern const char* s_encoding_names[12];
+extern const char* s_encoding_names[14];
 extern const char* s_compression_names[8];
 
 FORCE_INLINE const char* get_data_type_name(TSDataType type) {
@@ -77,7 +79,7 @@
 }
 
 FORCE_INLINE const char* get_encoding_name(TSEncoding encoding) {
-    ASSERT(encoding >= PLAIN && encoding <= FREQ);
+    ASSERT(encoding >= PLAIN && encoding <= ACLUSTER);
     return s_encoding_names[encoding];
 }
 
diff --git a/cpp/src/common/global.cc b/cpp/src/common/global.cc
index 7913a2b..7189abe 100644
--- a/cpp/src/common/global.cc
+++ b/cpp/src/common/global.cc
@@ -90,9 +90,9 @@
 const char* s_data_type_names[8] = {"BOOLEAN", "INT32", "INT64", "FLOAT",
                                     "DOUBLE",  "TEXT",  "VECTOR", "STRING"};
 
-const char* s_encoding_names[12] = {
+const char* s_encoding_names[14] = {
     "PLAIN",      "DICTIONARY", "RLE",     "DIFF",   "TS_2DIFF", "BITMAP",
-    "GORILLA_V1", "REGULAR",    "GORILLA", "ZIGZAG", "FREQ"};
+    "GORILLA_V1", "REGULAR",    "GORILLA", "ZIGZAG", "FREQ", "KCLUSTER", "ACLUSTER"};
 
 const char* s_compression_names[8] = {
     "UNCOMPRESSED", "SNAPPY", "GZIP", "LZO", "SDT", "PAA", "PLA", "LZ4",
diff --git a/cpp/src/encoding/acluster_decoder.h b/cpp/src/encoding/acluster_decoder.h
new file mode 100644
index 0000000..7d8c81e
--- /dev/null
+++ b/cpp/src/encoding/acluster_decoder.h
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ #ifndef ENCODING_ACLUSTER_DECODER_H
+#define ENCODING_ACLUSTER_DECODER_H
+
+#include <vector>
+#include <string>
+#include <memory>
+#include <cmath>
+#include <stdexcept>
+#include <limits>
+#include <cstdint>
+
+#include "decoder.h"
+#include "common/allocator/byte_stream.h"
+
+// ===================================================================
+//   Part 1: Core ACluster Decoding Logic
+// ===================================================================
+namespace AClusterDecodeLogic {
+// The entire logic here is identical to KClusterDecodeLogic, as they share the same decode_multidim_impl
+// Corresponds to: cluster_encoder_logic.cpp :: BitStreamReader
+class BitStreamReader {
+public:
+    explicit BitStreamReader(const std::vector<uint8_t>& data)
+        : buffer(data), bufferSize(data.size() * 8), currentBitPos(0) {}
+
+    inline long long readBits(int numBits) {
+        if (numBits < 0 || numBits > 64) throw std::invalid_argument("Cannot read " + std::to_string(numBits) + " bits.");
+        if (currentBitPos + numBits > bufferSize) throw std::out_of_range("Not enough bits in stream to read " + std::to_string(numBits) + " bits.");
+        
+        long long value = 0;
+        for (int i = 0; i < numBits; ++i) {
+            size_t byteIndex = currentBitPos / 8;
+            int bitIndex = 7 - (currentBitPos % 8);
+            bool bit = (buffer[byteIndex] & (1 << bitIndex)) != 0;
+            value = (value << 1) | (bit ? 1LL : 0LL);
+            currentBitPos++;
+        }
+        return value;
+    }
+
+    inline bool has_more() const {
+        return currentBitPos < bufferSize;
+    }
+
+private:
+    const std::vector<uint8_t>& buffer;
+    const size_t bufferSize;
+    size_t currentBitPos;
+};
+
+// Corresponds to: cluster_encoder_logic.cpp :: zigzagDecode
+inline long long zigzagDecode(long long n) { return (n >> 1) ^ (-(n & 1)); }
+
+// Corresponds to: cluster_encoder_logic.cpp :: decode_multidim_impl
+inline std::vector<std::vector<double>> decode_multidim_impl(const std::vector<uint8_t>& compressed_data) {
+    if (compressed_data.empty()) return {};
+    BitStreamReader reader(compressed_data);
+
+    int dim = reader.readBits(8);
+    int pack_size = reader.readBits(16);
+    int block_size = reader.readBits(16);
+    int page_count = reader.readBits(32);
+    int page_size = reader.readBits(16);
+    (void)pack_size; (void)page_size; // Suppress unused variable warnings
+
+    std::vector<double> pow10_lookup; for (int i = 0; i < 20; ++i) pow10_lookup.push_back(std::pow(10, i));
+    std::vector<std::vector<double>> all_data_rows;
+    
+    for (int i = 0; i < page_count; ++i) {
+        int k = reader.readBits(16);
+        int page_data_points = reader.readBits(16);
+
+        std::vector<int> max_decimals(dim);
+        std::vector<long long> min_values(dim);
+        for (int j = 0; j < dim; ++j) {
+            max_decimals[j] = reader.readBits(8);
+            int bit_len = reader.readBits(8);
+            long long sign = reader.readBits(1);
+            long long abs_val = reader.readBits(bit_len);
+            min_values[j] = (sign == 0) ? abs_val : -abs_val;
+        }
+        
+        std::vector<long long> min_bases(dim);
+        std::vector<int> max_base_bit_len(dim);
+        for (int j = 0; j < dim; ++j) {
+            int bit_len = reader.readBits(8);
+            long long sign = reader.readBits(1);
+            long long abs_val = reader.readBits(bit_len);
+            min_bases[j] = (sign == 0) ? abs_val : -abs_val;
+            max_base_bit_len[j] = reader.readBits(8);
+        }
+        
+        int pack_num = reader.readBits(16);
+        std::vector<std::vector<int>> pack_metadata(pack_num, std::vector<int>(dim));
+        for (int p = 0; p < pack_num; ++p) for (int j = 0; j < dim; ++j) pack_metadata[p][j] = reader.readBits(8);
+        
+        std::vector<std::vector<long long>> medoids_long(k, std::vector<long long>(dim));
+        for (int m = 0; m < k; ++m) for (int j = 0; j < dim; ++j) medoids_long[m][j] = reader.readBits(max_base_bit_len[j]) + min_bases[j];
+        
+        std::vector<long long> cluster_sizes(k, 0);
+        if (k > 0) {
+            int num_freq_blocks = (k + block_size - 1) / block_size;
+            std::vector<int> max_bits_per_block(num_freq_blocks);
+            for (int b = 0; b < num_freq_blocks; ++b) max_bits_per_block[b] = reader.readBits(8);
+            std::vector<long long> deltas(k);
+            int freq_idx = 0;
+            for (int b = 0; b < num_freq_blocks && freq_idx < k; ++b) {
+                int max_bit_for_this_block = max_bits_per_block[b];
+                int end = std::min((b + 1) * block_size, k);
+                for (int j = freq_idx; j < end; ++j) deltas[j] = reader.readBits(max_bit_for_this_block);
+                freq_idx = end;
+            }
+            cluster_sizes[0] = deltas[0];
+            for (size_t j = 1; j < deltas.size(); ++j) cluster_sizes[j] = cluster_sizes[j - 1] + deltas[j];
+        }
+        
+        std::vector<std::vector<long long>> residual_series(page_data_points, std::vector<long long>(dim));
+        int data_counter = 0;
+        if (page_data_points > 0) {
+            for (int p = 0; p < pack_num; ++p) {
+                const auto& bits = pack_metadata[p];
+                int points_in_pack = (p == pack_num - 1) ? (page_data_points - (p * pack_size)) : pack_size;
+                for (int pt = 0; pt < points_in_pack; ++pt) {
+                    if (data_counter >= page_data_points) break;
+                    for (int j = 0; j < dim; ++j) residual_series[data_counter][j] = zigzagDecode(reader.readBits(bits[j]));
+                    data_counter++;
+                }
+            }
+        }
+        
+        if (page_data_points > 0 && k > 0) {
+            int current_point_idx = 0;
+            for (int medoid_idx = 0; medoid_idx < k; ++medoid_idx) {
+                long long points_in_this_cluster = cluster_sizes[medoid_idx];
+                const auto& base_point = medoids_long[medoid_idx];
+                for (int p_count = 0; p_count < points_in_this_cluster; ++p_count) {
+                    if (current_point_idx < page_data_points) {
+                        std::vector<double> row(dim);
+                        for (int j = 0; j < dim; ++j) {
+                            long long int_val = base_point[j] + residual_series[current_point_idx][j] + min_values[j];
+                            row[j] = (max_decimals[j] > 0) ? (static_cast<double>(int_val) / pow10_lookup[max_decimals[j]]) : static_cast<double>(int_val);
+                        }
+                        all_data_rows.push_back(row);
+                        current_point_idx++;
+                    }
+                }
+            }
+        } else if (page_data_points > 0) {
+            for (int d = 0; d < page_data_points; ++d) {
+                std::vector<double> row(dim);
+                for (int j = 0; j < dim; ++j) {
+                     long long int_val = residual_series[d][j] + min_values[j];
+                     row[j] = (max_decimals[j] > 0) ? (static_cast<double>(int_val) / pow10_lookup[max_decimals[j]]) : static_cast<double>(int_val);
+                }
+                all_data_rows.push_back(row);
+            }
+        }
+    }
+    return all_data_rows;
+}
+
+} // namespace AClusterDecodeLogic
+
+
+// ===================================================================
+//   Part 2: The Decoder classes that adapt the logic for TsFile
+// ===================================================================
+namespace storage {
+
+template<typename T>
+class AClusterDecoderBase : public Decoder {
+public:
+    AClusterDecoderBase() : current_read_index_(0), decoded_(false) {}
+    ~AClusterDecoderBase() override {}
+
+    int read_boolean(bool&, common::ByteStream&) override { return common::E_NOT_SUPPORT; }
+    int read_int32(int32_t&, common::ByteStream&) override { return common::E_NOT_SUPPORT; }
+    int read_int64(long long&, common::ByteStream&) override { return common::E_NOT_SUPPORT; }
+    int read_float(float&, common::ByteStream&) override { return common::E_NOT_SUPPORT; }
+    int read_double(double&, common::ByteStream&) override { return common::E_NOT_SUPPORT; }
+    int read_String(common::String&, common::PageArena&, common::ByteStream&) override { return common::E_NOT_SUPPORT; }
+
+protected:
+    void decode_page_if_needed(common::ByteStream &in);
+    std::vector<T> decoded_points_;
+    size_t current_read_index_;
+    bool decoded_;
+};
+
+class DoubleAClusterDecoder : public AClusterDecoderBase<double> {
+public:
+    int read_double(double& val, common::ByteStream& in) override;
+    int read_float(float& val, common::ByteStream& in) override;
+};
+
+class IntAClusterDecoder : public AClusterDecoderBase<long long> {
+public:
+    int read_int64(long long& val, common::ByteStream& in) override;
+    int read_int32(int32_t& val, common::ByteStream& in) override;
+};
+
+
+// --- Implementation ---
+template<typename T>
+inline void AClusterDecoderBase<T>::decode_page_if_needed(common::ByteStream &in) {
+    if (decoded_ || !in.has_remaining()) return;
+
+    uint32_t data_size = 0;
+    uint32_t read_len = 0;
+    in.read_buf(reinterpret_cast<char*>(&data_size), sizeof(data_size), read_len);
+    if (read_len != sizeof(data_size)) throw std::runtime_error("Failed to read ACluster data block size");
+    if (data_size == 0) {
+        decoded_ = true;
+        return;
+    }
+
+    std::vector<uint8_t> compressed_block(data_size);
+    in.read_buf(reinterpret_cast<char*>(compressed_block.data()), data_size, read_len);
+    if (read_len != data_size) throw std::runtime_error("Failed to read ACluster data block");
+
+    AClusterDecodeLogic::BitBuffer fake_bytestream;
+    int dim = 1, pack_size = 10, block_size = 10, page_count = 1, page_size = 0;
+    AClusterDecodeLogic::BitBufferUtils::appendToBitstream(fake_bytestream, dim, 8);
+    AClusterDecodeLogic::BitBufferUtils::appendToBitstream(fake_bytestream, pack_size, 16);
+    AClusterDecodeLogic::BitBufferUtils::appendToBitstream(fake_bytestream, block_size, 16);
+    AClusterDecodeLogic::BitBufferUtils::appendToBitstream(fake_bytestream, page_count, 32);
+    AClusterDecodeLogic::BitBufferUtils::appendToBitstream(fake_bytestream, page_size, 16);
+    
+    std::vector<uint8_t> final_bytes_to_decode = fake_bytestream.toByteArray();
+    final_bytes_to_decode.insert(final_bytes_to_decode.end(), compressed_block.begin(), compressed_block.end());
+    
+    std::vector<std::vector<double>> result_multidim = AClusterDecodeLogic::decode_multidim_impl(final_bytes_to_decode);
+    
+    if (!result_multidim.empty()) {
+        decoded_points_.reserve(result_multidim.size());
+        for(const auto& row : result_multidim) {
+            if (!row.empty()) {
+                decoded_points_.push_back(static_cast<T>(row[0]));
+            }
+        }
+    }
+    decoded_ = true;
+}
+
+inline int DoubleAClusterDecoder::read_double(double& val, common::ByteStream& in) {
+    try { decode_page_if_needed(in); } catch(const std::exception& e) { return common::E_DECODING_ERROR; }
+    if (current_read_index_ >= decoded_points_.size()) return common::E_NO_MORE_DATA;
+    val = decoded_points_[current_read_index_++];
+    return common::E_OK;
+}
+inline int DoubleAClusterDecoder::read_float(float& val, common::ByteStream& in) {
+    double d_val;
+    int ret = read_double(d_val, in);
+    if (ret == common::E_OK) val = static_cast<float>(d_val);
+    return ret;
+}
+
+inline int IntAClusterDecoder::read_int64(long long& val, common::ByteStream& in) {
+    try { decode_page_if_needed(in); } catch(const std::exception& e) { return common::E_DECODING_ERROR; }
+    if (current_read_index_ >= decoded_points_.size()) return common::E_NO_MORE_DATA;
+    val = decoded_points_[current_read_index_++];
+    return common::E_OK;
+}
+inline int IntAClusterDecoder::read_int32(int32_t& val, common::ByteStream& in) {
+    long long ll_val;
+    int ret = read_int64(ll_val, in);
+    if (ret == common::E_OK) {
+        if (ll_val < std::numeric_limits<int32_t>::min() || ll_val > std::numeric_limits<int32_t>::max()) {
+            return common::E_DECODING_ERROR;
+        }
+        val = static_cast<int32_t>(ll_val);
+    }
+    return ret;
+}
+
+} // namespace storage
+#endif // ENCODING_ACLUSTER_DECODER_H
\ No newline at end of file
diff --git a/cpp/src/encoding/acluster_encoder.h b/cpp/src/encoding/acluster_encoder.h
new file mode 100644
index 0000000..1f7625b
--- /dev/null
+++ b/cpp/src/encoding/acluster_encoder.h
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef ENCODING_ACLUSTER_ENCODER_H
+#define ENCODING_ACLUSTER_ENCODER_H
+
+#include <vector>
+#include <string>
+#include <memory>
+#include <cmath>
+#include <numeric>
+#include <algorithm>
+#include <stdexcept>
+#include <limits>
+#include <unordered_set>
+#include <random>
+#include <sstream>
+#include <functional>
+#include <cstdint> // For uint8_t, size_t, etc.
+
+#include "encoder.h"
+#include "common/allocator/byte_stream.h"
+#include "common/global.h"
+
+// ===================================================================
+//   Part 1: Core ACluster Algorithm Logic
+// ===================================================================
+
+namespace AClusterLogic {
+
+// Corresponds to: algorithms/cluster_common.h
+template<typename T>
+struct SoAData {
+    std::vector<std::vector<T>> columns;
+    int dim = 0;
+    int num_rows = 0;
+};
+
+// Corresponds to: algorithms/cluster_common.h
+struct ClusteringResult {
+    std::vector<std::vector<long long>> medoids;
+    std::vector<int> cluster_assignment;
+    std::vector<long long> cluster_sizes;
+};
+
+// Corresponds to: algorithms/cluster_common.h
+struct PreprocessorResult {
+    std::unique_ptr<SoAData<long long>> data;
+    std::vector<int> max_decimal_places;
+    std::vector<long long> min_values;
+};
+
+// Corresponds to: cluster_encoder_logic.cpp
+struct BitBuffer {
+    std::vector<uint8_t> buffer;
+    size_t currentBitPos = 0;
+    
+    inline void appendBit(bool bit) {
+        size_t byteIndex = currentBitPos / 8;
+        int bitIndexInByte = currentBitPos % 8;
+        if (byteIndex >= buffer.size()) buffer.push_back(0);
+        if (bit) buffer[byteIndex] |= (1 << (7 - bitIndexInByte));
+        currentBitPos++;
+    }
+
+    inline void merge(const BitBuffer& other) {
+        for (size_t i = 0; i < other.currentBitPos; ++i) {
+            size_t byteIndex = i / 8;
+            int bitOffset = 7 - (i % 8);
+            bool bit = (other.buffer[byteIndex] & (1 << bitOffset)) != 0;
+            appendBit(bit);
+        }
+    }
+    
+    inline std::vector<uint8_t> toByteArray() const { return buffer; }
+    inline size_t size() const { return currentBitPos; }
+};
+
+namespace BitBufferUtils {
+    // Corresponds to: cluster_encoder_logic.cpp :: BitBufferUtils
+    inline void appendToBitstream(BitBuffer& bitBuffer, long long num, int bits) {
+        for (int i = bits - 1; i >= 0; --i) bitBuffer.appendBit((num >> i) & 1);
+    }
+    // Corresponds to: cluster_encoder_logic.cpp :: BitBufferUtils
+    inline int bitsRequiredNoSign(long long value) {
+        if (value == 0) return 1;
+        unsigned long long u_value = (value > 0) ? value : -value;
+        #if defined(__GNUC__) || defined(__clang__)
+        return 64 - __builtin_clzll(u_value);
+        #else
+        int length = 0; while(u_value > 0){ u_value >>= 1; length++; } return length == 0 ? 1 : length;
+        #endif
+    }
+}
+
+namespace { // Anonymous namespace for internal linkage helpers
+    // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for Preprocessor
+    inline int bitLength(long long value) {
+        if (value == 0) return 1;
+        unsigned long long u_value = (value > 0) ? value : -value;
+        #if defined(__GNUC__) || defined(__clang__)
+        return 64 - __builtin_clzll(u_value);
+        #else
+        int length = 0; while (u_value > 0) { u_value >>= 1; length++; } return length;
+        #endif
+    }
+    // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for clustering
+    inline long long calculateTotalLogCost(const std::vector<long long>& p1, const std::vector<long long>& p2, int dim) {
+        long long totalCost = 0;
+        for (int i = 0; i < dim; ++i) {
+            long long residual = p1[i] - p2[i];
+            totalCost += bitLength(residual >= 0 ? residual : -residual) + 1;
+        }
+        return totalCost;
+    }
+    // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for clustering
+    inline long long calculateBasePointStorageCost(const std::vector<long long>& basePoint, int dim) {
+        long long storageCost = 0;
+        for (int i = 0; i < dim; ++i) {
+            storageCost += bitLength(basePoint[i] >= 0 ? basePoint[i] : -basePoint[i]) + 1;
+        }
+        return storageCost;
+    }
+    // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for clustering
+    inline std::vector<long long> get_point_from_soa(const SoAData<long long>& soa_data, int row_idx) {
+        std::vector<long long> point(soa_data.dim);
+        for (int d = 0; d < soa_data.dim; ++d) {
+            point[d] = soa_data.columns[d][row_idx];
+        }
+        return point;
+    }
+    // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for clustering
+    inline std::vector<std::vector<long long>> convert_SoA_to_AoS(const SoAData<long long>& soa_data) {
+        if (soa_data.num_rows == 0) return {};
+        std::vector<std::vector<long long>> aos_data(soa_data.num_rows, std::vector<long long>(soa_data.dim));
+        for (int j = 0; j < soa_data.dim; ++j) {
+            for (int i = 0; i < soa_data.num_rows; ++i) {
+                aos_data[i][j] = soa_data.columns[j][i];
+            }
+        }
+        return aos_data;
+    }
+    // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for clustering
+    struct VectorHasher {
+        inline std::size_t operator()(const std::vector<long long>& vec) const {
+            std::size_t seed = vec.size();
+            for(long long i : vec) { seed ^= std::hash<long long>()(i) + 0x9e3779b9 + (seed << 6) + (seed >> 2); }
+            return seed;
+        }
+    };
+    // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for clustering
+    struct MedSortHelper {
+        std::vector<long long> medoid;
+        long long size;
+        int original_index;
+        inline bool operator<(const MedSortHelper& other) const { return size < other.size; }
+    };
+
+    // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for clustering
+    inline ClusteringResult sortResults(std::vector<std::vector<long long>>& medoids, std::vector<int>& assignment, std::vector<long long>& sizes) {
+        int k = medoids.size();
+        if (k == 0) return {{}, {}, {}};
+        std::vector<MedSortHelper> sorters;
+        sorters.reserve(k);
+        for(int i = 0; i < k; ++i) { sorters.push_back({medoids[i], sizes[i], i}); }
+        std::sort(sorters.begin(), sorters.end());
+        ClusteringResult res;
+        res.medoids.resize(k);
+        res.cluster_sizes.resize(k);
+        std::vector<int> old_to_new_map(k);
+        for(int i = 0; i < k; ++i) {
+            res.medoids[i] = sorters[i].medoid;
+            res.cluster_sizes[i] = sorters[i].size;
+            old_to_new_map[sorters[i].original_index] = i;
+        }
+        res.cluster_assignment.resize(assignment.size());
+        for(size_t i = 0; i < assignment.size(); ++i) {
+            if (assignment[i] != -1 && static_cast<size_t>(assignment[i]) < old_to_new_map.size()) {
+                res.cluster_assignment[i] = old_to_new_map[assignment[i]];
+            } else { res.cluster_assignment[i] = -1; }
+        }
+        return res;
+    }
+    // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for encoding
+    inline long long zigzagEncode_scalar(long long n) {
+        return (n << 1) ^ (n >> 63);
+    }
+} // end anonymous namespace
+
+// Corresponds to: cluster_encoder_logic.cpp :: adaptiveGreedyBasePointSelection
+inline ClusteringResult adaptiveGreedyBasePointSelection(const SoAData<long long>& page_data, int dim) {
+    auto data_aos = convert_SoA_to_AoS(page_data);
+    size_t n = data_aos.size();
+    if (n == 0) return {{}, {}, {}};
+    std::vector<std::vector<long long>> medoids_list;
+    std::unordered_set<std::vector<long long>, VectorHasher> existing_medoids_set;
+    std::vector<std::unordered_set<int>> points_in_cluster_list;
+    std::vector<int> point_to_leader_map(n, -1);
+    medoids_list.push_back(data_aos[0]);
+    existing_medoids_set.insert(data_aos[0]);
+    points_in_cluster_list.emplace_back();
+    points_in_cluster_list[0].insert(0);
+    point_to_leader_map[0] = 0;
+    for (size_t i = 1; i < n; ++i) {
+        const auto& current_point = data_aos[i];
+        if (existing_medoids_set.count(current_point)) {
+            int existing_leader_index = -1;
+            for (size_t j = 0; j < medoids_list.size(); ++j) { if (medoids_list[j] == current_point) { existing_leader_index = j; break; } }
+            if (existing_leader_index != -1) { points_in_cluster_list[existing_leader_index].insert(i); point_to_leader_map[i] = existing_leader_index; }
+            continue;
+        }
+        int best_leader_index = -1;
+        long long min_cost_to_existing_leader = std::numeric_limits<long long>::max();
+        for (size_t j = 0; j < medoids_list.size(); ++j) {
+            long long cost = calculateTotalLogCost(current_point, medoids_list[j], dim);
+            if (cost < min_cost_to_existing_leader) { min_cost_to_existing_leader = cost; best_leader_index = j; }
+        }
+        long long savings_from_current_point = min_cost_to_existing_leader;
+        long long savings_from_reassignment = 0;
+        const auto& points_in_best_leader_cluster = points_in_cluster_list[best_leader_index];
+        for (int point_index_in_cluster : points_in_best_leader_cluster) {
+            const auto& p = data_aos[point_index_in_cluster];
+            long long cost_to_old_leader = calculateTotalLogCost(p, medoids_list[best_leader_index], dim);
+            long long cost_to_new_potential_leader = calculateTotalLogCost(p, current_point, dim);
+            if (cost_to_new_potential_leader < cost_to_old_leader) { savings_from_reassignment += (cost_to_old_leader - cost_to_new_potential_leader); }
+        }
+        long long total_savings = savings_from_current_point + savings_from_reassignment;
+        long long storage_cost_for_new_point = calculateBasePointStorageCost(current_point, dim);
+        if (total_savings > storage_cost_for_new_point) {
+            int new_leader_id = medoids_list.size();
+            medoids_list.push_back(current_point);
+            existing_medoids_set.insert(current_point);
+            points_in_cluster_list.emplace_back();
+            points_in_cluster_list.back().insert(i);
+            point_to_leader_map[i] = new_leader_id;
+            std::vector<int> points_to_reassign;
+            for (int point_idx : points_in_cluster_list[best_leader_index]) {
+                const auto& p = data_aos[point_idx];
+                long long cost_to_old = calculateTotalLogCost(p, medoids_list[best_leader_index], dim);
+                long long cost_to_new = calculateTotalLogCost(p, current_point, dim);
+                if (cost_to_new < cost_to_old) { points_to_reassign.push_back(point_idx); }
+            }
+            for (int point_idx : points_to_reassign) {
+                points_in_cluster_list[best_leader_index].erase(point_idx);
+                points_in_cluster_list[new_leader_id].insert(point_idx);
+                point_to_leader_map[point_idx] = new_leader_id;
+            }
+        } else {
+            points_in_cluster_list[best_leader_index].insert(i);
+            point_to_leader_map[i] = best_leader_index;
+        }
+    }
+    int k = medoids_list.size();
+    auto discovered_medoids = medoids_list;
+    std::vector<long long> raw_cluster_sizes(k);
+    for (int i = 0; i < k; ++i) { raw_cluster_sizes[i] = points_in_cluster_list[i].size(); }
+    return sortResults(discovered_medoids, point_to_leader_map, raw_cluster_sizes);
+}
+
+// Corresponds to: cluster_encoder_logic.cpp :: residualCalculationZigzag_sorted
+inline std::vector<std::vector<long long>> residualCalculationZigzag_sorted(
+    const SoAData<long long>& data, const std::vector<std::vector<long long>>& medoids,
+    const std::vector<int>& assignments, const std::vector<long long>& sorted_cluster_sizes, int dim) 
+{
+    int n = data.num_rows;
+    size_t k = medoids.size();
+    if (n == 0) return {};
+    std::vector<int> writePointers(k);
+    int cumulativeCount = 0;
+    for (size_t i = 0; i < k; ++i) {
+        writePointers[i] = cumulativeCount;
+        cumulativeCount += static_cast<int>(sorted_cluster_sizes[i]);
+    }
+    std::vector<std::vector<long long>> sortedResidualSeries(n, std::vector<long long>(dim));
+    for (int i = 0; i < n; i++) {
+        int clusterId = assignments[i];
+        if (clusterId < 0 || static_cast<size_t>(clusterId) >= k) continue;
+        const auto& base = medoids[clusterId];
+        int targetIndex = writePointers[clusterId];
+        for (int j = 0; j < dim; j++) {
+            long long residual = data.columns[j][i] - base[j];
+            sortedResidualSeries[targetIndex][j] = zigzagEncode_scalar(residual);
+        }
+        writePointers[clusterId]++;
+    }
+    return sortedResidualSeries;
+}
+
+// Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for encoding
+namespace {
+    struct BasePointsResult { std::vector<long long> min_base; std::vector<int> max_base_bit_len; BitBuffer base_bitstream; };
+    inline BasePointsResult generateBitstreamEncodedBasePointsOptimized(const std::vector<std::vector<long long>>& medoids, int dim) {
+        if (medoids.empty()) return {std::vector<long long>(dim, 0), std::vector<int>(dim, 0), BitBuffer()};
+        size_t k = medoids.size();
+        std::vector<long long> min_base(dim, std::numeric_limits<long long>::max());
+        for (const auto& point : medoids) for (int i = 0; i < dim; ++i) min_base[i] = std::min(min_base[i], point[i]);
+        std::vector<std::vector<long long>> offset_medoids(k, std::vector<long long>(dim));
+        std::vector<int> max_base_bit_len(dim, 0);
+        for (size_t i = 0; i < k; ++i) {
+            for (int j = 0; j < dim; ++j) {
+                long long offset_value = medoids[i][j] - min_base[j];
+                offset_medoids[i][j] = offset_value;
+                max_base_bit_len[j] = std::max(max_base_bit_len[j], BitBufferUtils::bitsRequiredNoSign(offset_value));
+            }
+        }
+        BitBuffer base_bitstream;
+        for (const auto& offset_point : offset_medoids) for (int i = 0; i < dim; ++i) BitBufferUtils::appendToBitstream(base_bitstream, offset_point[i], max_base_bit_len[i]);
+        return {min_base, max_base_bit_len, base_bitstream};
+    }
+    inline BitBuffer generateBitstreamFrequency(const std::vector<long long>& cluster_size, int block_size) {
+        if (cluster_size.empty()) return BitBuffer();
+        BitBuffer freq_bit, freq_meta;
+        std::vector<long long> cluster_delta(cluster_size.size());
+        if (!cluster_size.empty()) {
+            cluster_delta[0] = cluster_size[0];
+            for (size_t i = 1; i < cluster_size.size(); ++i) cluster_delta[i] = cluster_size[i] - cluster_size[i - 1];
+        }
+        int total_length = cluster_size.size();
+        int num_blocks = (total_length + block_size - 1) / block_size;
+        for (int block_idx = 0; block_idx < num_blocks; ++block_idx) {
+            int start = block_idx * block_size;
+            int end = std::min(start + block_size, total_length);
+            long long max_frequency = 0;
+            for (int i = start; i < end; ++i) max_frequency = std::max(max_frequency, cluster_delta[i]);
+            int max_freq_bit = BitBufferUtils::bitsRequiredNoSign(max_frequency);
+            BitBufferUtils::appendToBitstream(freq_meta, max_freq_bit, 8);
+            for (int i = start; i < end; ++i) BitBufferUtils::appendToBitstream(freq_bit, cluster_delta[i], max_freq_bit);
+        }
+        freq_meta.merge(freq_bit);
+        return freq_meta;
+    }
+    struct EncodedDataResult { BitBuffer residual_bitstream; std::vector<std::vector<int>> pack_res_metadata; int pack_num; int total_data_points; };
+    inline EncodedDataResult generateBitstreamEncodedData(const std::vector<std::vector<long long>>& residual_series, int pack_size, int dim) {
+        int total_data_points = residual_series.size();
+        if (total_data_points == 0) return {BitBuffer(), {}, 0, 0};
+        int pack_num = (total_data_points + pack_size - 1) / pack_size;
+        BitBuffer residual_bit;
+        std::vector<std::vector<int>> pack_res_metadata(pack_num, std::vector<int>(dim));
+        int pack_index = 0;
+        for (int pack_start = 0; pack_start < total_data_points; pack_start += pack_size) {
+            int pack_end = std::min(pack_start + pack_size, total_data_points);
+            std::vector<int> max_residual_bits(dim, 0);
+            for (int i = pack_start; i < pack_end; ++i) for (int j = 0; j < dim; ++j) max_residual_bits[j] = std::max(max_residual_bits[j], BitBufferUtils::bitsRequiredNoSign(residual_series[i][j]));
+            pack_res_metadata[pack_index] = max_residual_bits;
+            for (int i = pack_start; i < pack_end; ++i) for (int j = 0; j < dim; ++j) BitBufferUtils::appendToBitstream(residual_bit, residual_series[i][j], max_residual_bits[j]);
+            pack_index++;
+        }
+        return {residual_bit, pack_res_metadata, pack_num, total_data_points};
+    }
+} // end anonymous namespace
+
+inline std::unique_ptr<PreprocessorResult> preprocessPageFromDoubles(const std::vector<double>& page_doubles, int& dim) {
+    dim = 1;
+    if (page_doubles.empty()) return nullptr;
+    int max_decimal_places = 0;
+    for(double val : page_doubles) {
+        std::string s = std::to_string(val);
+        auto dot_pos = s.find('.');
+        if (dot_pos != std::string::npos) {
+            s.erase(s.find_last_not_of('0') + 1, std::string::npos);
+            if (!s.empty() && s.back() == '.') s.pop_back();
+            max_decimal_places = std::max(max_decimal_places, (int)(s.length() - dot_pos - 1));
+        }
+    }
+    double scale = std::pow(10, max_decimal_places);
+    auto integerSoA = std::make_unique<SoAData<long long>>();
+    integerSoA->dim = 1;
+    integerSoA->num_rows = page_doubles.size();
+    integerSoA->columns.resize(1);
+    integerSoA->columns[0].reserve(page_doubles.size());
+    for(double val : page_doubles) {
+        integerSoA->columns[0].push_back(static_cast<long long>(val * scale));
+    }
+    long long min_val_long = 0;
+    if(!integerSoA->columns[0].empty()){
+        min_val_long = *std::min_element(integerSoA->columns[0].begin(), integerSoA->columns[0].end());
+    }
+    for (long long& val : integerSoA->columns[0]) {
+        val -= min_val_long;
+    }
+    return std::make_unique<PreprocessorResult>(PreprocessorResult{std::move(integerSoA), {max_decimal_places}, {min_val_long}});
+}
+
+} // namespace AClusterLogic
+
+namespace storage {
+
+// Base class for holding common data for ACluster encoders.
+template<typename BufferType>
+class AClusterEncoderData {
+protected:
+    AClusterEncoderData() {
+        page_size_ = common::g_config_value_.page_writer_max_point_num_;
+        points_buffer_.reserve(page_size_);
+    }
+
+    void reset_data() {
+        points_buffer_.clear();
+    }
+
+    std::vector<BufferType> points_buffer_;
+    uint32_t page_size_;
+};
+
+// ====================================================================
+// DoubleAClusterEncoder for FLOAT and DOUBLE
+// ====================================================================
+class DoubleAClusterEncoder : public Encoder, private AClusterEncoderData<double> {
+public:
+    DoubleAClusterEncoder() = default;
+    ~DoubleAClusterEncoder() override {}
+
+    void reset() override { reset_data(); }
+
+    int get_max_byte_size() override {
+        return page_size_ * sizeof(double) * 2;
+    }
+
+    // --- Overrides for supported types ---
+    int encode(double value, common::ByteStream &out_stream) override {
+        points_buffer_.push_back(value);
+        return common::E_OK;
+    }
+    int encode(float value, common::ByteStream &out_stream) override {
+        points_buffer_.push_back(static_cast<double>(value));
+        return common::E_OK;
+    }
+
+    // --- Override for flush ---
+    int flush(common::ByteStream &out_stream) override;
+
+    // --- Overrides for unsupported types ---
+    int encode(bool, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+    int encode(int32_t, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+    int encode(int64_t, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+    int encode(common::String, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+};
+
+// ====================================================================
+// IntAClusterEncoder for INT32 and INT64
+// ====================================================================
+class IntAClusterEncoder : public Encoder, private AClusterEncoderData<long long> {
+public:
+    IntAClusterEncoder() = default;
+    ~IntAClusterEncoder() override {}
+
+    void reset() override { reset_data(); }
+
+    int get_max_byte_size() override {
+        return page_size_ * sizeof(long long) * 2;
+    }
+
+    // --- Overrides for supported types ---
+    int encode(long long value, common::ByteStream &out_stream) override {
+        points_buffer_.push_back(value);
+        return common::E_OK;
+    }
+    int encode(int32_t value, common::ByteStream &out_stream) override {
+        points_buffer_.push_back(static_cast<long long>(value));
+        return common::E_OK;
+    }
+
+    // --- Override for flush ---
+    int flush(common::ByteStream &out_stream) override;
+
+    // --- Overrides for unsupported types ---
+    int encode(bool, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+    int encode(float, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+    int encode(double, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+    int encode(common::String, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+};
+
+
+// ====================================================================
+// Implementation of flush methods
+// ====================================================================
+
+// --- FLUSH IMPLEMENTATION FOR DoubleAClusterEncoder ---
+inline int DoubleAClusterEncoder::flush(common::ByteStream &out_stream) {
+    if (points_buffer_.empty()) {
+        return common::E_OK;
+    }
+    try {
+        int dim = 1;
+        int pack_size = 10;
+        int block_size = 10;
+
+        auto preproc_res_ptr = AClusterLogic::preprocessPageFromDoubles(points_buffer_, dim);
+        if (!preproc_res_ptr || !preproc_res_ptr->data) {
+            reset();
+            return common::E_OK; 
+        }
+        
+        // <<<<<<<<<<<< CORE LOGIC CHANGE >>>>>>>>>>>>
+        // Call ACluster's core clustering algorithm
+        AClusterLogic::ClusteringResult clustering_res = AClusterLogic::adaptiveGreedyBasePointSelection(*preproc_res_ptr->data, dim);
+        
+        using namespace AClusterLogic;
+        int page_k = clustering_res.medoids.size();
+        BitBuffer fre_bitstream = generateBitstreamFrequency(clustering_res.cluster_sizes, block_size);
+        std::vector<std::vector<long long>> residuals = residualCalculationZigzag_sorted(*preproc_res_ptr->data, clustering_res.medoids, clustering_res.cluster_assignment, clustering_res.cluster_sizes, dim);
+        
+        EncodedDataResult bitstream_res = generateBitstreamEncodedData(residuals, pack_size, dim);
+        BasePointsResult basepoint_res = generateBitstreamEncodedBasePointsOptimized(clustering_res.medoids, dim);
+
+        BitBuffer single_page_bitstream;
+        BitBufferUtils::appendToBitstream(single_page_bitstream, (long long)page_k, 16);
+        BitBufferUtils::appendToBitstream(single_page_bitstream, bitstream_res.total_data_points, 16);
+        for (int j = 0; j < dim; ++j) {
+            BitBufferUtils::appendToBitstream(single_page_bitstream, preproc_res_ptr->max_decimal_places[j], 8);
+            long long val = preproc_res_ptr->min_values[j]; long long abs_val = std::abs(val); int bit_len = BitBufferUtils::bitsRequiredNoSign(abs_val);
+            BitBufferUtils::appendToBitstream(single_page_bitstream, bit_len, 8); BitBufferUtils::appendToBitstream(single_page_bitstream, (val >= 0 ? 0 : 1), 1); BitBufferUtils::appendToBitstream(single_page_bitstream, abs_val, bit_len);
+        }
+        for (int j = 0; j < dim; ++j) {
+            long long val = basepoint_res.min_base[j]; long long abs_val = std::abs(val); int bit_len = BitBufferUtils::bitsRequiredNoSign(abs_val);
+            BitBufferUtils::appendToBitstream(single_page_bitstream, bit_len, 8); BitBufferUtils::appendToBitstream(single_page_bitstream, (val >= 0 ? 0 : 1), 1); BitBufferUtils::appendToBitstream(single_page_bitstream, abs_val, bit_len);
+            BitBufferUtils::appendToBitstream(single_page_bitstream, basepoint_res.max_base_bit_len[j], 8);
+        }
+        BitBufferUtils::appendToBitstream(single_page_bitstream, (long long)bitstream_res.pack_res_metadata.size(), 16);
+        for (const auto& pack : bitstream_res.pack_res_metadata) for (int val : pack) BitBufferUtils::appendToBitstream(single_page_bitstream, val, 8);
+        single_page_bitstream.merge(basepoint_res.base_bitstream);
+        single_page_bitstream.merge(fre_bitstream);
+        single_page_bitstream.merge(bitstream_res.residual_bitstream);
+        
+        std::vector<uint8_t> compressed_bytes = single_page_bitstream.toByteArray();
+        uint32_t data_size = compressed_bytes.size();
+        out_stream.write_buf(reinterpret_cast<const char*>(&data_size), sizeof(data_size));
+        if (data_size > 0) {
+            out_stream.write_buf(reinterpret_cast<const char*>(compressed_bytes.data()), data_size);
+        }
+    } catch (const std::exception& e) {
+        reset();
+        return common::E_ENCODING_ERROR;
+    }
+    reset();
+    return common::E_OK;
+}
+
+// --- FLUSH IMPLEMENTATION FOR IntAClusterEncoder ---
+inline int IntAClusterEncoder::flush(common::ByteStream &out_stream) {
+    if (points_buffer_.empty()) {
+        return common::E_OK;
+    }
+    try {
+        int dim = 1;
+        int pack_size = 10;
+        int block_size = 10;
+        
+        auto preproc_res_ptr = AClusterLogic::preprocessPageFromLongs(points_buffer_, dim);
+        if (!preproc_res_ptr || !preproc_res_ptr->data) {
+            reset();
+            return common::E_OK; 
+        }
+        
+        // <<<<<<<<<<<< CORE LOGIC CHANGE >>>>>>>>>>>>
+        // Call ACluster's core clustering algorithm
+        AClusterLogic::ClusteringResult clustering_res = AClusterLogic::adaptiveGreedyBasePointSelection(*preproc_res_ptr->data, dim);
+        
+        using namespace AClusterLogic;
+        int page_k = clustering_res.medoids.size();
+        BitBuffer fre_bitstream = generateBitstreamFrequency(clustering_res.cluster_sizes, block_size);
+        std::vector<std::vector<long long>> residuals = residualCalculationZigzag_sorted(*preproc_res_ptr->data, clustering_res.medoids, clustering_res.cluster_assignment, clustering_res.cluster_sizes, dim);
+        
+        EncodedDataResult bitstream_res = generateBitstreamEncodedData(residuals, pack_size, dim);
+        BasePointsResult basepoint_res = generateBitstreamEncodedBasePointsOptimized(clustering_res.medoids, dim);
+
+        BitBuffer single_page_bitstream;
+        BitBufferUtils::appendToBitstream(single_page_bitstream, (long long)page_k, 16);
+        BitBufferUtils::appendToBitstream(single_page_bitstream, bitstream_res.total_data_points, 16);
+        for (int j = 0; j < dim; ++j) {
+            BitBufferUtils::appendToBitstream(single_page_bitstream, preproc_res_ptr->max_decimal_places[j], 8);
+            long long val = preproc_res_ptr->min_values[j]; long long abs_val = std::abs(val); int bit_len = BitBufferUtils::bitsRequiredNoSign(abs_val);
+            BitBufferUtils::appendToBitstream(single_page_bitstream, bit_len, 8); BitBufferUtils::appendToBitstream(single_page_bitstream, (val >= 0 ? 0 : 1), 1); BitBufferUtils::appendToBitstream(single_page_bitstream, abs_val, bit_len);
+        }
+        for (int j = 0; j < dim; ++j) {
+            long long val = basepoint_res.min_base[j]; long long abs_val = std::abs(val); int bit_len = BitBufferUtils::bitsRequiredNoSign(abs_val);
+            BitBufferUtils::appendToBitstream(single_page_bitstream, bit_len, 8); BitBufferUtils::appendToBitstream(single_page_bitstream, (val >= 0 ? 0 : 1), 1); BitBufferUtils::appendToBitstream(single_page_bitstream, abs_val, bit_len);
+            BitBufferUtils::appendToBitstream(single_page_bitstream, basepoint_res.max_base_bit_len[j], 8);
+        }
+        BitBufferUtils::appendToBitstream(single_page_bitstream, (long long)bitstream_res.pack_res_metadata.size(), 16);
+        for (const auto& pack : bitstream_res.pack_res_metadata) for (int val : pack) BitBufferUtils::appendToBitstream(single_page_bitstream, val, 8);
+        single_page_bitstream.merge(basepoint_res.base_bitstream);
+        single_page_bitstream.merge(fre_bitstream);
+        single_page_bitstream.merge(bitstream_res.residual_bitstream);
+        
+        std::vector<uint8_t> compressed_bytes = single_page_bitstream.toByteArray();
+        uint32_t data_size = compressed_bytes.size();
+        out_stream.write_buf(reinterpret_cast<const char*>(&data_size), sizeof(data_size));
+        if (data_size > 0) {
+            out_stream.write_buf(reinterpret_cast<const char*>(compressed_bytes.data()), data_size);
+        }
+    } catch (const std::exception& e) {
+        reset();
+        return common::E_ENCODING_ERROR;
+    }
+    reset();
+    return common::E_OK;
+}
+
+} // namespace storage
+
+#endif // ENCODING_ACLUSTER_ENCODER_H
\ No newline at end of file
diff --git a/cpp/src/encoding/decoder_factory.h b/cpp/src/encoding/decoder_factory.h
index 8918c92..0f1b838 100644
--- a/cpp/src/encoding/decoder_factory.h
+++ b/cpp/src/encoding/decoder_factory.h
@@ -24,6 +24,8 @@
 #include "gorilla_decoder.h"
 #include "plain_decoder.h"
 #include "ts2diff_decoder.h"
+#include "kcluster_decoder.h"
+#include "acluster_decoder.h"
 
 namespace storage {
 
@@ -82,6 +84,24 @@
             } else {
                 ASSERT(false);
             }
+        } else if (encoding == common::KCLUCSTER) { // Note the typo in your db_common.h
+            if (data_type == common::FLOAT || data_type == common::DOUBLE) {
+                ALLOC_AND_RETURN_DECODER(DoubleKClusterDecoder);
+            } else if (data_type == common::INT32 || data_type == common::INT64) {
+                ALLOC_AND_RETURN_DECODER(IntKClusterDecoder);
+            } else {
+                ASSERT(false);
+                return nullptr;
+            }
+        } else if (encoding == common::ACLUSTER) {
+            if (data_type == common::FLOAT || data_type == common::DOUBLE) {
+                ALLOC_AND_RETURN_DECODER(DoubleAClusterDecoder);
+            } else if (data_type == common::INT32 || data_type == common::INT64) {
+                ALLOC_AND_RETURN_DECODER(IntAClusterDecoder);
+            } else {
+                ASSERT(false);
+                return nullptr;
+            }
         } else {
             // not support now
             ASSERT(false);
diff --git a/cpp/src/encoding/encoder_factory.h b/cpp/src/encoding/encoder_factory.h
index 0e582ae..b9f428d 100644
--- a/cpp/src/encoding/encoder_factory.h
+++ b/cpp/src/encoding/encoder_factory.h
@@ -25,6 +25,9 @@
 #include "gorilla_encoder.h"
 #include "plain_encoder.h"
 #include "ts2diff_encoder.h"
+#include "kcluster_encoder.h"
+#include "acluster_encoder.h"
+
 
 namespace storage {
 
@@ -111,6 +114,22 @@
             return nullptr;
         } else if (encoding == common::FREQ) {
             return nullptr;
+        } else if (encoding == common::KCLUSTER) { 
+            if (data_type == common::FLOAT || data_type == common::DOUBLE) {
+                ALLOC_AND_RETURN_ENCODER(DoubleKClusterEncoder);
+            } else if (data_type == common::INT32 || data_type == common::INT64) {
+                ALLOC_AND_RETURN_ENCODER(IntKClusterEncoder);
+            } else {
+                ASSERT(false);
+            }
+        } else if (encoding == common::ACLUSTER) {
+            if (data_type == common::FLOAT || data_type == common::DOUBLE) {
+                ALLOC_AND_RETURN_ENCODER(DoubleAClusterEncoder);
+            } else if (data_type == common::INT32 || data_type == common::INT64) {
+                ALLOC_AND_RETURN_ENCODER(IntAClusterEncoder);
+            } else {
+                ASSERT(false);
+            }
         } else {
             // not support now
             ASSERT(false);
diff --git a/cpp/src/encoding/kcluster_decoder.h b/cpp/src/encoding/kcluster_decoder.h
new file mode 100644
index 0000000..07a0dc2
--- /dev/null
+++ b/cpp/src/encoding/kcluster_decoder.h
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ #ifndef ENCODING_KCLUSTER_DECODER_H
+#define ENCODING_KCLUSTER_DECODER_H
+
+#include <vector>
+#include <string>
+#include <memory>
+#include <cmath>
+#include <stdexcept>
+#include <limits>
+#include <cstdint>
+
+#include "decoder.h"
+#include "common/allocator/byte_stream.h"
+
+// ===================================================================
+//   Part 1: Core KCluster Decoding Logic
+// ===================================================================
+namespace KClusterDecodeLogic {
+
+// Corresponds to: cluster_encoder_logic.cpp :: BitStreamReader
+class BitStreamReader {
+public:
+    explicit BitStreamReader(const std::vector<uint8_t>& data)
+        : buffer(data), bufferSize(data.size() * 8), currentBitPos(0) {}
+
+    inline long long readBits(int numBits) {
+        if (numBits < 0 || numBits > 64) throw std::invalid_argument("Cannot read " + std::to_string(numBits) + " bits.");
+        if (currentBitPos + numBits > bufferSize) throw std::out_of_range("Not enough bits in stream to read " + std::to_string(numBits) + " bits.");
+        
+        long long value = 0;
+        for (int i = 0; i < numBits; ++i) {
+            size_t byteIndex = currentBitPos / 8;
+            int bitIndex = 7 - (currentBitPos % 8);
+            bool bit = (buffer[byteIndex] & (1 << bitIndex)) != 0;
+            value = (value << 1) | (bit ? 1LL : 0LL);
+            currentBitPos++;
+        }
+        return value;
+    }
+
+    inline bool has_more() const {
+        return currentBitPos < bufferSize;
+    }
+
+private:
+    const std::vector<uint8_t>& buffer;
+    const size_t bufferSize;
+    size_t currentBitPos;
+};
+
+// Corresponds to: cluster_encoder_logic.cpp :: zigzagDecode
+inline long long zigzagDecode(long long n) { return (n >> 1) ^ (-(n & 1)); }
+
+// Corresponds to: cluster_encoder_logic.cpp :: decode_multidim_impl
+inline std::vector<std::vector<double>> decode_multidim_impl(const std::vector<uint8_t>& compressed_data) {
+    if (compressed_data.empty()) return {};
+    BitStreamReader reader(compressed_data);
+
+    int dim = reader.readBits(8);
+    int pack_size = reader.readBits(16);
+    int block_size = reader.readBits(16);
+    int page_count = reader.readBits(32);
+    int page_size = reader.readBits(16);
+    (void)pack_size; (void)page_size; // Suppress unused variable warnings
+
+    std::vector<double> pow10_lookup; for (int i = 0; i < 20; ++i) pow10_lookup.push_back(std::pow(10, i));
+    std::vector<std::vector<double>> all_data_rows;
+    
+    for (int i = 0; i < page_count; ++i) {
+        int k = reader.readBits(16);
+        int page_data_points = reader.readBits(16);
+
+        std::vector<int> max_decimals(dim);
+        std::vector<long long> min_values(dim);
+        for (int j = 0; j < dim; ++j) {
+            max_decimals[j] = reader.readBits(8);
+            int bit_len = reader.readBits(8);
+            long long sign = reader.readBits(1);
+            long long abs_val = reader.readBits(bit_len);
+            min_values[j] = (sign == 0) ? abs_val : -abs_val;
+        }
+        
+        std::vector<long long> min_bases(dim);
+        std::vector<int> max_base_bit_len(dim);
+        for (int j = 0; j < dim; ++j) {
+            int bit_len = reader.readBits(8);
+            long long sign = reader.readBits(1);
+            long long abs_val = reader.readBits(bit_len);
+            min_bases[j] = (sign == 0) ? abs_val : -abs_val;
+            max_base_bit_len[j] = reader.readBits(8);
+        }
+        
+        int pack_num = reader.readBits(16);
+        std::vector<std::vector<int>> pack_metadata(pack_num, std::vector<int>(dim));
+        for (int p = 0; p < pack_num; ++p) for (int j = 0; j < dim; ++j) pack_metadata[p][j] = reader.readBits(8);
+        
+        std::vector<std::vector<long long>> medoids_long(k, std::vector<long long>(dim));
+        for (int m = 0; m < k; ++m) for (int j = 0; j < dim; ++j) medoids_long[m][j] = reader.readBits(max_base_bit_len[j]) + min_bases[j];
+        
+        std::vector<long long> cluster_sizes(k, 0);
+        if (k > 0) {
+            int num_freq_blocks = (k + block_size - 1) / block_size;
+            std::vector<int> max_bits_per_block(num_freq_blocks);
+            for (int b = 0; b < num_freq_blocks; ++b) max_bits_per_block[b] = reader.readBits(8);
+            std::vector<long long> deltas(k);
+            int freq_idx = 0;
+            for (int b = 0; b < num_freq_blocks && freq_idx < k; ++b) {
+                int max_bit_for_this_block = max_bits_per_block[b];
+                int end = std::min((b + 1) * block_size, k);
+                for (int j = freq_idx; j < end; ++j) deltas[j] = reader.readBits(max_bit_for_this_block);
+                freq_idx = end;
+            }
+            cluster_sizes[0] = deltas[0];
+            for (size_t j = 1; j < deltas.size(); ++j) cluster_sizes[j] = cluster_sizes[j - 1] + deltas[j];
+        }
+        
+        std::vector<std::vector<long long>> residual_series(page_data_points, std::vector<long long>(dim));
+        int data_counter = 0;
+        if (page_data_points > 0) {
+            for (int p = 0; p < pack_num; ++p) {
+                const auto& bits = pack_metadata[p];
+                int points_in_pack = (p == pack_num - 1) ? (page_data_points - (p * pack_size)) : pack_size;
+                for (int pt = 0; pt < points_in_pack; ++pt) {
+                    if (data_counter >= page_data_points) break;
+                    for (int j = 0; j < dim; ++j) residual_series[data_counter][j] = zigzagDecode(reader.readBits(bits[j]));
+                    data_counter++;
+                }
+            }
+        }
+        
+        if (page_data_points > 0 && k > 0) {
+            int current_point_idx = 0;
+            for (int medoid_idx = 0; medoid_idx < k; ++medoid_idx) {
+                long long points_in_this_cluster = cluster_sizes[medoid_idx];
+                const auto& base_point = medoids_long[medoid_idx];
+                for (int p_count = 0; p_count < points_in_this_cluster; ++p_count) {
+                    if (current_point_idx < page_data_points) {
+                        std::vector<double> row(dim);
+                        for (int j = 0; j < dim; ++j) {
+                            long long int_val = base_point[j] + residual_series[current_point_idx][j] + min_values[j];
+                            row[j] = (max_decimals[j] > 0) ? (static_cast<double>(int_val) / pow10_lookup[max_decimals[j]]) : static_cast<double>(int_val);
+                        }
+                        all_data_rows.push_back(row);
+                        current_point_idx++;
+                    }
+                }
+            }
+        } else if (page_data_points > 0) {
+            for (int d = 0; d < page_data_points; ++d) {
+                std::vector<double> row(dim);
+                for (int j = 0; j < dim; ++j) {
+                     long long int_val = residual_series[d][j] + min_values[j];
+                     row[j] = (max_decimals[j] > 0) ? (static_cast<double>(int_val) / pow10_lookup[max_decimals[j]]) : static_cast<double>(int_val);
+                }
+                all_data_rows.push_back(row);
+            }
+        }
+    }
+    return all_data_rows;
+}
+
+} // namespace KClusterDecodeLogic
+
+
+// ===================================================================
+//   Part 2: The Decoder classes that adapt the logic for TsFile
+// ===================================================================
+namespace storage {
+
+template<typename T>
+class KClusterDecoderBase : public Decoder {
+public:
+    KClusterDecoderBase() : current_read_index_(0), decoded_(false) {}
+    ~KClusterDecoderBase() override {}
+
+    int read_boolean(bool&, common::ByteStream&) override { return common::E_NOT_SUPPORT; }
+    int read_int32(int32_t&, common::ByteStream&) override { return common::E_NOT_SUPPORT; }
+    int read_int64(int64_t&, common::ByteStream&) override { return common::E_NOT_SUPPORT; }
+    int read_float(float&, common::ByteStream&) override { return common::E_NOT_SUPPORT; }
+    int read_double(double&, common::ByteStream&) override { return common::E_NOT_SUPPORT; }
+    int read_String(common::String&, common::PageArena&, common::ByteStream&) override { return common::E_NOT_SUPPORT; }
+
+protected:
+    void decode_page_if_needed(common::ByteStream &in);
+    std::vector<T> decoded_points_;
+    size_t current_read_index_;
+    bool decoded_;
+};
+
+class DoubleKClusterDecoder : public KClusterDecoderBase<double> {
+public:
+    int read_double(double& val, common::ByteStream& in) override;
+    int read_float(float& val, common::ByteStream& in) override;
+};
+
+class IntKClusterDecoder : public KClusterDecoderBase<long long> {
+public:
+    int read_int64(long long& val, common::ByteStream& in) override;
+    int read_int32(int32_t& val, common::ByteStream& in) override;
+};
+
+
+// --- Implementation ---
+template<typename T>
+inline void KClusterDecoderBase<T>::decode_page_if_needed(common::ByteStream &in) {
+    if (decoded_ || !in.has_remaining()) return;
+
+    uint32_t data_size = 0;
+    uint32_t read_len = 0;
+    in.read_buf(reinterpret_cast<char*>(&data_size), sizeof(data_size), read_len);
+    if (read_len != sizeof(data_size)) throw std::runtime_error("Failed to read KCluster data block size");
+    if (data_size == 0) {
+        decoded_ = true;
+        return;
+    }
+
+    std::vector<uint8_t> compressed_block(data_size);
+    in.read_buf(reinterpret_cast<char*>(compressed_block.data()), data_size, read_len);
+    if (read_len != data_size) throw std::runtime_error("Failed to read KCluster data block");
+
+    KClusterLogic::BitBuffer fake_bytestream;
+    int dim = 1, pack_size = 10, block_size = 10, page_count = 1, page_size = 0;
+    KClusterLogic::BitBufferUtils::appendToBitstream(fake_bytestream, dim, 8);
+    KClusterLogic::BitBufferUtils::appendToBitstream(fake_bytestream, pack_size, 16);
+    KClusterLogic::BitBufferUtils::appendToBitstream(fake_bytestream, block_size, 16);
+    KClusterLogic::BitBufferUtils::appendToBitstream(fake_bytestream, page_count, 32);
+    KClusterLogic::BitBufferUtils::appendToBitstream(fake_bytestream, page_size, 16);
+    
+    std::vector<uint8_t> final_bytes_to_decode = fake_bytestream.toByteArray();
+    final_bytes_to_decode.insert(final_bytes_to_decode.end(), compressed_block.begin(), compressed_block.end());
+    
+    std::vector<std::vector<double>> result_multidim = KClusterDecodeLogic::decode_multidim_impl(final_bytes_to_decode);
+    
+    if (!result_multidim.empty()) {
+        decoded_points_.reserve(result_multidim.size());
+        for(const auto& row : result_multidim) {
+            if (!row.empty()) {
+                decoded_points_.push_back(static_cast<T>(row[0]));
+            }
+        }
+    }
+    decoded_ = true;
+}
+
+inline int DoubleKClusterDecoder::read_double(double& val, common::ByteStream& in) {
+    try { decode_page_if_needed(in); } catch(const std::exception& e) { return common::E_DECODING_ERROR; }
+    if (current_read_index_ >= decoded_points_.size()) return common::E_NO_MORE_DATA;
+    val = decoded_points_[current_read_index_++];
+    return common::E_OK;
+}
+inline int DoubleKClusterDecoder::read_float(float& val, common::ByteStream& in) {
+    double d_val;
+    int ret = read_double(d_val, in);
+    if (ret == common::E_OK) val = static_cast<float>(d_val);
+    return ret;
+}
+
+inline int IntKClusterDecoder::read_int64(long long& val, common::ByteStream& in) {
+    try { decode_page_if_needed(in); } catch(const std::exception& e) { return common::E_DECODING_ERROR; }
+    if (current_read_index_ >= decoded_points_.size()) return common::E_NO_MORE_DATA;
+    val = decoded_points_[current_read_index_++];
+    return common::E_OK;
+}
+inline int IntKClusterDecoder::read_int32(int32_t& val, common::ByteStream& in) {
+    long long ll_val;
+    int ret = read_int64(ll_val, in);
+    if (ret == common::E_OK) {
+        if (ll_val < std::numeric_limits<int32_t>::min() || ll_val > std::numeric_limits<int32_t>::max()) {
+            return common::E_DECODING_ERROR;
+        }
+        val = static_cast<int32_t>(ll_val);
+    }
+    return ret;
+}
+
+} // namespace storage
+#endif // ENCODING_KCLUSTER_DECODER_H
\ No newline at end of file
diff --git a/cpp/src/encoding/kcluster_encoder.h b/cpp/src/encoding/kcluster_encoder.h
new file mode 100644
index 0000000..0da8f41
--- /dev/null
+++ b/cpp/src/encoding/kcluster_encoder.h
@@ -0,0 +1,834 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+#ifndef ENCODING_KCLUSTER_ENCODER_H
+#define ENCODING_KCLUSTER_ENCODER_H
+
+#include <vector>
+#include <string>
+#include <memory>
+#include <cmath>
+#include <numeric>
+#include <algorithm>
+#include <stdexcept>
+#include <limits>
+#include <unordered_set>
+#include <random>
+#include <sstream>
+#include <functional>
+
+#include "encoder.h"
+#include "common/allocator/byte_stream.h"
+#include "common/global.h"
+
+// ===================================================================
+//   Part 1: Core KCluster Algorithm Logic
+//   All implementations are encapsulated in the KClusterLogic namespace
+// ===================================================================
+namespace KClusterLogic {
+
+// Corresponds to: algorithms/cluster_common.h
+template<typename T>
+struct SoAData {
+    std::vector<std::vector<T>> columns;
+    int dim = 0;
+    int num_rows = 0;
+};
+
+// Corresponds to: algorithms/cluster_common.h
+struct ClusteringResult {
+    std::vector<std::vector<long long>> medoids;
+    std::vector<int> cluster_assignment;
+    std::vector<long long> cluster_sizes;
+};
+
+// Corresponds to: algorithms/cluster_common.h
+struct PreprocessorResult {
+    std::unique_ptr<SoAData<long long>> data;
+    std::vector<int> max_decimal_places;
+    std::vector<long long> min_values;
+};
+
+// Corresponds to: cluster_encoder_logic.cpp
+struct BitBuffer {
+    std::vector<uint8_t> buffer;
+    size_t currentBitPos = 0;
+    
+    inline void appendBit(bool bit) {
+        size_t byteIndex = currentBitPos / 8;
+        int bitIndexInByte = currentBitPos % 8;
+        if (byteIndex >= buffer.size()) buffer.push_back(0);
+        if (bit) buffer[byteIndex] |= (1 << (7 - bitIndexInByte));
+        currentBitPos++;
+    }
+
+    inline void merge(const BitBuffer& other) {
+        for (size_t i = 0; i < other.currentBitPos; ++i) {
+            size_t byteIndex = i / 8;
+            int bitOffset = 7 - (i % 8);
+            bool bit = (other.buffer[byteIndex] & (1 << bitOffset)) != 0;
+            appendBit(bit);
+        }
+    }
+    
+    inline std::vector<uint8_t> toByteArray() const { return buffer; }
+    inline size_t size() const { return currentBitPos; }
+};
+
+namespace BitBufferUtils {
+    // Corresponds to: cluster_encoder_logic.cpp :: BitBufferUtils
+    inline void appendToBitstream(BitBuffer& bitBuffer, long long num, int bits) {
+        for (int i = bits - 1; i >= 0; --i) bitBuffer.appendBit((num >> i) & 1);
+    }
+    // Corresponds to: cluster_encoder_logic.cpp :: BitBufferUtils
+    inline int bitsRequiredNoSign(long long value) {
+        if (value == 0) return 1;
+        unsigned long long u_value = (value > 0) ? value : -value;
+        #if defined(__GNUC__) || defined(__clang__)
+        return 64 - __builtin_clzll(u_value);
+        #else
+        int length = 0; while(u_value > 0){ u_value >>= 1; length++; } return length == 0 ? 1 : length;
+        #endif
+    }
+}
+
+namespace { // Anonymous namespace for internal linkage helpers
+    // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for Preprocessor
+    inline int bitLength(long long value) {
+        if (value == 0) return 1;
+        unsigned long long u_value = (value > 0) ? value : -value;
+        #if defined(__GNUC__) || defined(__clang__)
+        return 64 - __builtin_clzll(u_value);
+        #else
+        int length = 0; while (u_value > 0) { u_value >>= 1; length++; } return length;
+        #endif
+    }
+    // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for clustering
+    inline long long calculateTotalLogCost(const std::vector<long long>& p1, const std::vector<long long>& p2, int dim) {
+        long long totalCost = 0;
+        for (int i = 0; i < dim; ++i) {
+            long long residual = p1[i] - p2[i];
+            totalCost += bitLength(residual >= 0 ? residual : -residual) + 1;
+        }
+        return totalCost;
+    }
+    // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for clustering
+    inline long long manhattanDistance(const std::vector<long long>& p1, const std::vector<long long>& p2) {
+        long long sum = 0;
+        for (size_t i = 0; i < p1.size(); ++i) { sum += std::abs(p1[i] - p2[i]); }
+        return sum;
+    }
+    // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for clustering
+    inline std::vector<long long> get_point_from_soa(const SoAData<long long>& soa_data, int row_idx) {
+        std::vector<long long> point(soa_data.dim);
+        for (int d = 0; d < soa_data.dim; ++d) {
+            point[d] = soa_data.columns[d][row_idx];
+        }
+        return point;
+    }
+    // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for clustering
+    inline std::vector<std::vector<long long>> convert_SoA_to_AoS(const SoAData<long long>& soa_data) {
+        if (soa_data.num_rows == 0) return {};
+        std::vector<std::vector<long long>> aos_data(soa_data.num_rows, std::vector<long long>(soa_data.dim));
+        for (int j = 0; j < soa_data.dim; ++j) {
+            for (int i = 0; i < soa_data.num_rows; ++i) {
+                aos_data[i][j] = soa_data.columns[j][i];
+            }
+        }
+        return aos_data;
+    }
+    // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for clustering
+    struct VectorHasher {
+        inline std::size_t operator()(const std::vector<long long>& vec) const {
+            std::size_t seed = vec.size();
+            for(long long i : vec) { seed ^= std::hash<long long>()(i) + 0x9e3779b9 + (seed << 6) + (seed >> 2); }
+            return seed;
+        }
+    };
+    // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for clustering
+    struct MedSortHelper {
+        std::vector<long long> medoid;
+        long long size;
+        int original_index;
+        inline bool operator<(const MedSortHelper& other) const { return size < other.size; }
+    };
+    // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for clustering
+    inline ClusteringResult sortResults(std::vector<std::vector<long long>>& medoids, std::vector<int>& assignment, std::vector<long long>& sizes) {
+        int k = medoids.size();
+        if (k == 0) return {{}, {}, {}};
+        std::vector<MedSortHelper> sorters;
+        sorters.reserve(k);
+        for(int i = 0; i < k; ++i) { sorters.push_back({medoids[i], sizes[i], i}); }
+        std::sort(sorters.begin(), sorters.end());
+        ClusteringResult res;
+        res.medoids.resize(k);
+        res.cluster_sizes.resize(k);
+        std::vector<int> old_to_new_map(k);
+        for(int i = 0; i < k; ++i) {
+            res.medoids[i] = sorters[i].medoid;
+            res.cluster_sizes[i] = sorters[i].size;
+            old_to_new_map[sorters[i].original_index] = i;
+        }
+        res.cluster_assignment.resize(assignment.size());
+        for(size_t i = 0; i < assignment.size(); ++i) {
+            if (assignment[i] != -1 && static_cast<size_t>(assignment[i]) < old_to_new_map.size()) {
+                res.cluster_assignment[i] = old_to_new_map[assignment[i]];
+            } else { res.cluster_assignment[i] = -1; }
+        }
+        return res;
+    }
+    // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for encoding
+    inline long long zigzagEncode_scalar(long long n) {
+        return (n << 1) ^ (n >> 63);
+    }
+} // end anonymous namespace
+
+
+// Corresponds to: cluster_encoder_logic.cpp :: acceleratedInitialization
+inline std::vector<std::vector<long long>> acceleratedInitialization(const SoAData<long long>& data, int k, std::mt19937& gen) {
+    std::vector<std::vector<long long>> medoids;
+    if (data.num_rows == 0 || k <= 0) return medoids;
+    medoids.reserve(k);
+    std::unordered_set<std::vector<long long>, VectorHasher> selected_medoids_set;
+    std::uniform_int_distribution<> distrib(0, data.num_rows - 1);
+    int first_index = distrib(gen);
+    auto first_medoid = get_point_from_soa(data, first_index);
+    medoids.push_back(first_medoid);
+    selected_medoids_set.insert(first_medoid);
+    std::vector<long long> distances(data.num_rows);
+    for (int i = 0; i < data.num_rows; ++i) {
+        distances[i] = manhattanDistance(get_point_from_soa(data, i), medoids[0]);
+    }
+    for (int i = 1; i < k; ++i) {
+        if (selected_medoids_set.size() >= (size_t)data.num_rows) break;
+        long long total_distance = std::accumulate(distances.begin(), distances.end(), 0LL);
+        if (total_distance == 0) {
+            int next_idx = 0;
+            while(selected_medoids_set.count(get_point_from_soa(data, next_idx))) {
+                next_idx = (next_idx + 1) % data.num_rows;
+            }
+            auto new_medoid = get_point_from_soa(data, next_idx);
+            medoids.push_back(new_medoid);
+            selected_medoids_set.insert(new_medoid);
+            continue;
+        }
+        std::uniform_real_distribution<double> dist_double(0.0, (double)total_distance);
+        long long rand_val = static_cast<long long>(dist_double(gen));
+        long long current_sum = 0;
+        int chosen_idx = -1;
+        for(int j = 0; j < data.num_rows; ++j) {
+            current_sum += distances[j];
+            if (current_sum >= rand_val) {
+                chosen_idx = j;
+                break;
+            }
+        }
+        if (chosen_idx == -1) chosen_idx = data.num_rows - 1;
+        while(selected_medoids_set.count(get_point_from_soa(data, chosen_idx))) {
+            chosen_idx = (chosen_idx + 1) % data.num_rows;
+        }
+        auto new_medoid = get_point_from_soa(data, chosen_idx);
+        medoids.push_back(new_medoid);
+        selected_medoids_set.insert(new_medoid);
+        for (int idx = 0; idx < data.num_rows; ++idx) {
+            long long dist_new = manhattanDistance(get_point_from_soa(data, idx), new_medoid);
+            distances[idx] = std::min(distances[idx], dist_new);
+        }
+    }
+    return medoids;
+}
+
+// Corresponds to: cluster_encoder_logic.cpp :: updateMedoids
+inline std::vector<std::vector<long long>> updateMedoids(const SoAData<long long>& data, const std::vector<int>& clusters, const std::vector<std::vector<long long>>& currentMedoids, int dim) {
+    int k = currentMedoids.size();
+    if (k == 0) return {};
+    std::vector<std::vector<long long>> newMedoids(k, std::vector<long long>(dim));
+    std::vector<std::vector<int>> cluster_point_indices(k);
+    for (int i = 0; i < data.num_rows; ++i) {
+        if (clusters[i] != -1 && clusters[i] < k) { cluster_point_indices[clusters[i]].push_back(i); }
+    }
+    for (int medoidIndex = 0; medoidIndex < k; ++medoidIndex) {
+        const auto& member_indices = cluster_point_indices[medoidIndex];
+        if (member_indices.empty()) { newMedoids[medoidIndex] = currentMedoids[medoidIndex]; continue; }
+        long long best_total_cost = std::numeric_limits<long long>::max();
+        int best_medoid_candidate_idx_in_data = -1;
+        for (int candidate_idx_in_data : member_indices) {
+            long long current_total_cost = 0;
+            auto candidate_medoid = get_point_from_soa(data, candidate_idx_in_data);
+            for (int member_idx_in_data : member_indices) {
+                if (candidate_idx_in_data == member_idx_in_data) continue;
+                auto other_point = get_point_from_soa(data, member_idx_in_data);
+                for(int d = 0; d < dim; ++d) {
+                    long long residual = candidate_medoid[d] - other_point[d];
+                    current_total_cost += bitLength(residual >= 0 ? residual : -residual) + 1;
+                }
+            }
+            if (current_total_cost < best_total_cost) {
+                best_total_cost = current_total_cost;
+                best_medoid_candidate_idx_in_data = candidate_idx_in_data;
+            }
+        }
+        if (best_medoid_candidate_idx_in_data != -1) {
+            newMedoids[medoidIndex] = get_point_from_soa(data, best_medoid_candidate_idx_in_data);
+        } else { newMedoids[medoidIndex] = currentMedoids[medoidIndex]; }
+    }
+    return newMedoids;
+}
+
+// Corresponds to: cluster_encoder_logic.cpp :: kMedoidLogCost
+inline ClusteringResult kMedoidLogCost(const SoAData<long long>& data, int k, int max_iter, double tol, int dim) {
+    int n = data.num_rows;
+    if (n == 0) return {{}, {}, {}};
+    auto aos_data = convert_SoA_to_AoS(data);
+    std::sort(aos_data.begin(), aos_data.end());
+    auto last = std::unique(aos_data.begin(), aos_data.end());
+    int distinctCount = std::distance(aos_data.begin(), last);
+    if (distinctCount < k) k = distinctCount;
+    
+    std::random_device rd; std::mt19937 gen(rd());
+    auto medoids = acceleratedInitialization(data, k, gen);
+    k = medoids.size();
+    if (k==0) return {};
+    
+    std::vector<int> cluster_assignment(n);
+    long long previous_total_cost = std::numeric_limits<long long>::max();
+    
+    for (int iter = 0; iter < max_iter; ++iter) {
+        long long total_cost_this_round = 0;
+        for (int i = 0; i < n; ++i) {
+            long long min_cost = std::numeric_limits<long long>::max();
+            int assigned_medoid_idx = -1;
+            auto current_point = get_point_from_soa(data, i);
+            for (int m = 0; m < k; ++m) {
+                long long cost = calculateTotalLogCost(current_point, medoids[m], dim);
+                if (cost < min_cost) {
+                    min_cost = cost;
+                    assigned_medoid_idx = m;
+                } else if (cost == min_cost) {
+                    if (medoids[m] == current_point) {
+                        assigned_medoid_idx = m;
+                        break;
+                    }
+                }
+            }
+            cluster_assignment[i] = assigned_medoid_idx;
+            total_cost_this_round += min_cost;
+        }
+        if (iter > 0 && std::abs((double)previous_total_cost - total_cost_this_round) < tol) break;
+        previous_total_cost = total_cost_this_round;
+        auto new_medoids = updateMedoids(data, cluster_assignment, medoids, dim);
+        if (medoids == new_medoids) break;
+        medoids = new_medoids;
+    }
+    std::vector<long long> cluster_sizes(k, 0);
+    for (int assignment : cluster_assignment) { if (assignment != -1 && (size_t)assignment < cluster_sizes.size()) cluster_sizes[assignment]++; }
+    return sortResults(medoids, cluster_assignment, cluster_sizes);
+}
+
+// Corresponds to: cluster_encoder_logic.cpp :: residualCalculationZigzag_sorted
+inline std::vector<std::vector<long long>> residualCalculationZigzag_sorted(
+    const SoAData<long long>& data, const std::vector<std::vector<long long>>& medoids,
+    const std::vector<int>& assignments, const std::vector<long long>& sorted_cluster_sizes, int dim) 
+{
+    int n = data.num_rows;
+    size_t k = medoids.size();
+    if (n == 0) return {};
+    std::vector<int> writePointers(k);
+    int cumulativeCount = 0;
+    for (size_t i = 0; i < k; ++i) {
+        writePointers[i] = cumulativeCount;
+        cumulativeCount += static_cast<int>(sorted_cluster_sizes[i]);
+    }
+    std::vector<std::vector<long long>> sortedResidualSeries(n, std::vector<long long>(dim));
+    for (int i = 0; i < n; i++) {
+        int clusterId = assignments[i];
+        if (clusterId < 0 || static_cast<size_t>(clusterId) >= k) continue;
+        const auto& base = medoids[clusterId];
+        int targetIndex = writePointers[clusterId];
+        for (int j = 0; j < dim; j++) {
+            long long residual = data.columns[j][i] - base[j];
+            sortedResidualSeries[targetIndex][j] = zigzagEncode_scalar(residual);
+        }
+        writePointers[clusterId]++;
+    }
+    return sortedResidualSeries;
+}
+
+namespace {
+    // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for encoding
+    struct BasePointsResult { std::vector<long long> min_base; std::vector<int> max_base_bit_len; BitBuffer base_bitstream; };
+    inline BasePointsResult generateBitstreamEncodedBasePointsOptimized(const std::vector<std::vector<long long>>& medoids, int dim) {
+        if (medoids.empty()) return {std::vector<long long>(dim, 0), std::vector<int>(dim, 0), BitBuffer()};
+        size_t k = medoids.size();
+        std::vector<long long> min_base(dim, std::numeric_limits<long long>::max());
+        for (const auto& point : medoids) for (int i = 0; i < dim; ++i) min_base[i] = std::min(min_base[i], point[i]);
+        std::vector<std::vector<long long>> offset_medoids(k, std::vector<long long>(dim));
+        std::vector<int> max_base_bit_len(dim, 0);
+        for (size_t i = 0; i < k; ++i) {
+            for (int j = 0; j < dim; ++j) {
+                long long offset_value = medoids[i][j] - min_base[j];
+                offset_medoids[i][j] = offset_value;
+                max_base_bit_len[j] = std::max(max_base_bit_len[j], BitBufferUtils::bitsRequiredNoSign(offset_value));
+            }
+        }
+        BitBuffer base_bitstream;
+        for (const auto& offset_point : offset_medoids) for (int i = 0; i < dim; ++i) BitBufferUtils::appendToBitstream(base_bitstream, offset_point[i], max_base_bit_len[i]);
+        return {min_base, max_base_bit_len, base_bitstream};
+    }
+    // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for encoding
+    inline BitBuffer generateBitstreamFrequency(const std::vector<long long>& cluster_size, int block_size) {
+        if (cluster_size.empty()) return BitBuffer();
+        BitBuffer freq_bit, freq_meta;
+        std::vector<long long> cluster_delta(cluster_size.size());
+        if (!cluster_size.empty()) {
+            cluster_delta[0] = cluster_size[0];
+            for (size_t i = 1; i < cluster_size.size(); ++i) cluster_delta[i] = cluster_size[i] - cluster_size[i - 1];
+        }
+        int total_length = cluster_size.size();
+        int num_blocks = (total_length + block_size - 1) / block_size;
+        for (int block_idx = 0; block_idx < num_blocks; ++block_idx) {
+            int start = block_idx * block_size;
+            int end = std::min(start + block_size, total_length);
+            long long max_frequency = 0;
+            for (int i = start; i < end; ++i) max_frequency = std::max(max_frequency, cluster_delta[i]);
+            int max_freq_bit = BitBufferUtils::bitsRequiredNoSign(max_frequency);
+            BitBufferUtils::appendToBitstream(freq_meta, max_freq_bit, 8);
+            for (int i = start; i < end; ++i) BitBufferUtils::appendToBitstream(freq_bit, cluster_delta[i], max_freq_bit);
+        }
+        freq_meta.merge(freq_bit);
+        return freq_meta;
+    }
+    // Corresponds to: cluster_encoder_logic.cpp :: anonymous helpers for encoding
+    struct EncodedDataResult { BitBuffer residual_bitstream; std::vector<std::vector<int>> pack_res_metadata; int pack_num; int total_data_points; };
+    inline EncodedDataResult generateBitstreamEncodedData(const std::vector<std::vector<long long>>& residual_series, int pack_size, int dim) {
+        int total_data_points = residual_series.size();
+        if (total_data_points == 0) return {BitBuffer(), {}, 0, 0};
+        int pack_num = (total_data_points + pack_size - 1) / pack_size;
+        BitBuffer residual_bit;
+        std::vector<std::vector<int>> pack_res_metadata(pack_num, std::vector<int>(dim));
+        int pack_index = 0;
+        for (int pack_start = 0; pack_start < total_data_points; pack_start += pack_size) {
+            int pack_end = std::min(pack_start + pack_size, total_data_points);
+            std::vector<int> max_residual_bits(dim, 0);
+            for (int i = pack_start; i < pack_end; ++i) for (int j = 0; j < dim; ++j) max_residual_bits[j] = std::max(max_residual_bits[j], BitBufferUtils::bitsRequiredNoSign(residual_series[i][j]));
+            pack_res_metadata[pack_index] = max_residual_bits;
+            for (int i = pack_start; i < pack_end; ++i) for (int j = 0; j < dim; ++j) BitBufferUtils::appendToBitstream(residual_bit, residual_series[i][j], max_residual_bits[j]);
+            pack_index++;
+        }
+        return {residual_bit, pack_res_metadata, pack_num, total_data_points};
+    }
+} // end anonymous namespace
+
+inline std::unique_ptr<PreprocessorResult> preprocessPageFromDoubles(const std::vector<double>& page_doubles, int& dim) {
+    dim = 1;
+    if (page_doubles.empty()) return nullptr;
+    int max_decimal_places = 0;
+    for(double val : page_doubles) {
+        std::string s = std::to_string(val);
+        auto dot_pos = s.find('.');
+        if (dot_pos != std::string::npos) {
+            s.erase(s.find_last_not_of('0') + 1, std::string::npos);
+            if (!s.empty() && s.back() == '.') s.pop_back();
+            max_decimal_places = std::max(max_decimal_places, (int)(s.length() - dot_pos - 1));
+        }
+    }
+    double scale = std::pow(10, max_decimal_places);
+    auto integerSoA = std::make_unique<SoAData<long long>>();
+    integerSoA->dim = 1;
+    integerSoA->num_rows = page_doubles.size();
+    integerSoA->columns.resize(1);
+    integerSoA->columns[0].reserve(page_doubles.size());
+    for(double val : page_doubles) {
+        integerSoA->columns[0].push_back(static_cast<long long>(val * scale));
+    }
+    long long min_val_long = 0;
+    if(!integerSoA->columns[0].empty()){
+        min_val_long = *std::min_element(integerSoA->columns[0].begin(), integerSoA->columns[0].end());
+    }
+    for (long long& val : integerSoA->columns[0]) {
+        val -= min_val_long;
+    }
+    return std::make_unique<PreprocessorResult>(PreprocessorResult{std::move(integerSoA), {max_decimal_places}, {min_val_long}});
+}
+
+inline std::unique_ptr<PreprocessorResult> preprocessPageFromLongs(const std::vector<long long>& page_longs, int& dim) {
+    dim = 1;
+    if (page_longs.empty()) return nullptr;
+    
+    auto integerSoA = std::make_unique<SoAData<long long>>();
+    integerSoA->dim = 1;
+    integerSoA->num_rows = page_longs.size();
+    integerSoA->columns.resize(1);
+    integerSoA->columns[0] = page_longs; // Directly copy the data
+    
+    long long min_val_long = 0;
+    if(!integerSoA->columns[0].empty()){
+        min_val_long = *std::min_element(integerSoA->columns[0].begin(), integerSoA->columns[0].end());
+    }
+    for (long long& val : integerSoA->columns[0]) {
+        val -= min_val_long;
+    }
+    
+    // For INT types, decimal places is 0.
+    return std::make_unique<PreprocessorResult>(PreprocessorResult{std::move(integerSoA), {0}, {min_val_long}});
+}
+
+inline std::unique_ptr<PreprocessorResult> preprocessPageFromDoubles(const std::vector<double>& page_doubles, int& dim) {
+    dim = 1;
+    if (page_doubles.empty()) return nullptr;
+    int max_decimal_places = 0;
+    for(double val : page_doubles) {
+        std::string s = std::to_string(val);
+        auto dot_pos = s.find('.');
+        if (dot_pos != std::string::npos) {
+            s.erase(s.find_last_not_of('0') + 1, std::string::npos);
+            if (!s.empty() && s.back() == '.') s.pop_back();
+            max_decimal_places = std::max(max_decimal_places, (int)(s.length() - dot_pos - 1));
+        }
+    }
+    double scale = std::pow(10, max_decimal_places);
+    auto integerSoA = std::make_unique<SoAData<long long>>();
+    integerSoA->dim = 1;
+    integerSoA->num_rows = page_doubles.size();
+    integerSoA->columns.resize(1);
+    integerSoA->columns[0].reserve(page_doubles.size());
+    for(double val : page_doubles) {
+        integerSoA->columns[0].push_back(static_cast<long long>(val * scale));
+    }
+    long long min_val_long = 0;
+    if(!integerSoA->columns[0].empty()){
+        min_val_long = *std::min_element(integerSoA->columns[0].begin(), integerSoA->columns[0].end());
+    }
+    for (long long& val : integerSoA->columns[0]) {
+        val -= min_val_long;
+    }
+    return std::make_unique<PreprocessorResult>(PreprocessorResult{std::move(integerSoA), {max_decimal_places}, {min_val_long}});
+}
+
+} // namespace KClusterLogic
+
+
+
+namespace storage {
+
+template<typename BufferType>
+class KClusterEncoderData {
+protected:
+    KClusterEncoderData() {
+        page_size_ = common::g_config_value_.page_writer_max_point_num_;
+        points_buffer_.reserve(page_size_);
+        k_ = 100;
+    }
+
+    void reset_data() {
+        points_buffer_.clear();
+    }
+
+    std::vector<BufferType> points_buffer_;
+    uint32_t page_size_;
+    int k_;
+};
+
+template<typename T, typename BufferType>
+class KClusterEncoderBase : public Encoder {
+public:
+    KClusterEncoderBase() {
+        page_size_ = common::g_config_value_.page_writer_max_point_num_;
+        points_buffer_.reserve(page_size_);
+        k_ = 10;
+    }
+    ~KClusterEncoderBase() override {}
+
+    void reset() {
+        points_buffer_.clear();
+    }
+
+    int get_max_byte_size() override {
+        return page_size_ * sizeof(BufferType) * 2;
+    }
+    
+    int encode(T value, common::ByteStream &out_stream) {
+        points_buffer_.push_back(static_cast<BufferType>(value));
+        return common::E_OK;
+    }
+
+    int flush(common::ByteStream &out_stream) override;
+
+    int encode(bool, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+    int encode(int32_t, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+    int encode(int64_t, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+    int encode(float, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+    int encode(double, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+    int encode(common::String, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+
+protected:
+    std::vector<BufferType> points_buffer_;
+    uint32_t page_size_;
+    int k_;
+};
+
+// ====================================================================
+// DoubleKClusterEncoder for FLOAT and DOUBLE
+// ====================================================================
+
+class DoubleKClusterEncoder : public Encoder, private KClusterEncoderData<double> {
+public:
+    DoubleKClusterEncoder() = default;
+    ~DoubleKClusterEncoder() override {}
+
+    void reset() override { reset_data(); }
+
+    int get_max_byte_size() override {
+        return page_size_ * sizeof(double) * 2;
+    }
+
+    // --- Overrides for supported types ---
+    int encode(double value, common::ByteStream &out_stream) override {
+        points_buffer_.push_back(value);
+        return common::E_OK;
+    }
+    int encode(float value, common::ByteStream &out_stream) override {
+        points_buffer_.push_back(static_cast<double>(value));
+        return common::E_OK;
+    }
+
+    // --- Override for flush ---
+    int flush(common::ByteStream &out_stream) override;
+
+    // --- Overrides for unsupported types ---
+    int encode(bool, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+    int encode(int32_t, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+    int encode(int64_t, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+    int encode(common::String, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+};
+
+// ====================================================================
+// IntKClusterEncoder for INT32 and INT64
+// ====================================================================
+class IntKClusterEncoder : public Encoder, private KClusterEncoderData<long long> {
+public:
+    IntKClusterEncoder() = default;
+    ~IntKClusterEncoder() override {}
+
+    void reset() override { reset_data(); }
+
+    int get_max_byte_size() override {
+        return page_size_ * sizeof(long long) * 2;
+    }
+
+    // --- Overrides for supported types ---
+    int encode(long long value, common::ByteStream &out_stream) override {
+        points_buffer_.push_back(value);
+        return common::E_OK;
+    }
+    int encode(int32_t value, common::ByteStream &out_stream) override {
+        points_buffer_.push_back(static_cast<long long>(value));
+        return common::E_OK;
+    }
+
+    // --- Override for flush ---
+    int flush(common::ByteStream &out_stream) override;
+
+    // --- Overrides for unsupported types ---
+    int encode(bool, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+    int encode(float, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+    int encode(double, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+    int encode(common::String, common::ByteStream &) override { return common::E_TYPE_NOT_MATCH; }
+};
+
+
+template<>
+inline int KClusterEncoderBase<double, double>::flush(common::ByteStream &out_stream) {
+    if (points_buffer_.empty()) {
+            return common::E_OK;
+        }
+        try {
+            int dim = 1;
+            int pack_size = 10;
+            int block_size = 10;
+
+            auto preproc_res_ptr = KClusterLogic::preprocessPageFromDoubles(points_buffer_, dim);
+            if (!preproc_res_ptr || !preproc_res_ptr->data) {
+                reset();
+                return common::E_OK; 
+            }
+            
+            KClusterLogic::ClusteringResult clustering_res = KClusterLogic::kMedoidLogCost(*preproc_res_ptr->data, k_, 10, 1.0e-3, dim);
+            
+            using namespace KClusterLogic;
+            int page_k = clustering_res.medoids.size();
+            BitBuffer fre_bitstream = generateBitstreamFrequency(clustering_res.cluster_sizes, block_size);
+            std::vector<std::vector<long long>> residuals = residualCalculationZigzag_sorted(*preproc_res_ptr->data, clustering_res.medoids, clustering_res.cluster_assignment, clustering_res.cluster_sizes, dim);
+            
+            EncodedDataResult bitstream_res = generateBitstreamEncodedData(residuals, pack_size, dim);
+            BasePointsResult basepoint_res = generateBitstreamEncodedBasePointsOptimized(clustering_res.medoids, dim);
+
+            BitBuffer single_page_bitstream;
+            BitBufferUtils::appendToBitstream(single_page_bitstream, (long long)page_k, 16);
+            BitBufferUtils::appendToBitstream(single_page_bitstream, bitstream_res.total_data_points, 16);
+            for (int j = 0; j < dim; ++j) {
+                BitBufferUtils::appendToBitstream(single_page_bitstream, preproc_res_ptr->max_decimal_places[j], 8);
+                long long val = preproc_res_ptr->min_values[j]; long long abs_val = std::abs(val); int bit_len = BitBufferUtils::bitsRequiredNoSign(abs_val);
+                BitBufferUtils::appendToBitstream(single_page_bitstream, bit_len, 8); BitBufferUtils::appendToBitstream(single_page_bitstream, (val >= 0 ? 0 : 1), 1); BitBufferUtils::appendToBitstream(single_page_bitstream, abs_val, bit_len);
+            }
+            for (int j = 0; j < dim; ++j) {
+                long long val = basepoint_res.min_base[j]; long long abs_val = std::abs(val); int bit_len = BitBufferUtils::bitsRequiredNoSign(abs_val);
+                BitBufferUtils::appendToBitstream(single_page_bitstream, bit_len, 8); BitBufferUtils::appendToBitstream(single_page_bitstream, (val >= 0 ? 0 : 1), 1); BitBufferUtils::appendToBitstream(single_page_bitstream, abs_val, bit_len);
+                BitBufferUtils::appendToBitstream(single_page_bitstream, basepoint_res.max_base_bit_len[j], 8);
+            }
+            BitBufferUtils::appendToBitstream(single_page_bitstream, (long long)bitstream_res.pack_res_metadata.size(), 16);
+            for (const auto& pack : bitstream_res.pack_res_metadata) for (int val : pack) BitBufferUtils::appendToBitstream(single_page_bitstream, val, 8);
+            single_page_bitstream.merge(basepoint_res.base_bitstream);
+            single_page_bitstream.merge(fre_bitstream);
+            single_page_bitstream.merge(bitstream_res.residual_bitstream);
+            
+            std::vector<uint8_t> compressed_bytes = single_page_bitstream.toByteArray();
+            uint32_t data_size = compressed_bytes.size();
+            out_stream.write_buf(reinterpret_cast<const char*>(&data_size), sizeof(data_size));
+            if (data_size > 0) {
+                out_stream.write_buf(reinterpret_cast<const char*>(compressed_bytes.data()), data_size);
+            }
+        } catch (const std::exception& e) {
+            reset();
+            return common::E_ENCODING_ERROR;
+        }
+        reset();
+        return common::E_OK;
+}
+
+inline int DoubleKClusterEncoder::flush(common::ByteStream &out_stream) {
+    if (points_buffer_.empty()) {
+        return common::E_OK;
+    }
+    try {
+        int dim = 1;
+        int pack_size = 10;
+        int block_size = 10;
+
+        auto preproc_res_ptr = KClusterLogic::preprocessPageFromDoubles(points_buffer_, dim);
+        if (!preproc_res_ptr || !preproc_res_ptr->data) {
+            reset();
+            return common::E_OK; 
+        }
+        
+        KClusterLogic::ClusteringResult clustering_res = KClusterLogic::kMedoidLogCost(*preproc_res_ptr->data, k_, 10, 1.0e-3, dim);
+        
+        using namespace KClusterLogic;
+        int page_k = clustering_res.medoids.size();
+        BitBuffer fre_bitstream = generateBitstreamFrequency(clustering_res.cluster_sizes, block_size);
+        std::vector<std::vector<long long>> residuals = residualCalculationZigzag_sorted(*preproc_res_ptr->data, clustering_res.medoids, clustering_res.cluster_assignment, clustering_res.cluster_sizes, dim);
+        
+        EncodedDataResult bitstream_res = generateBitstreamEncodedData(residuals, pack_size, dim);
+        BasePointsResult basepoint_res = generateBitstreamEncodedBasePointsOptimized(clustering_res.medoids, dim);
+
+        BitBuffer single_page_bitstream;
+        BitBufferUtils::appendToBitstream(single_page_bitstream, (long long)page_k, 16);
+        BitBufferUtils::appendToBitstream(single_page_bitstream, bitstream_res.total_data_points, 16);
+        for (int j = 0; j < dim; ++j) {
+            BitBufferUtils::appendToBitstream(single_page_bitstream, preproc_res_ptr->max_decimal_places[j], 8);
+            long long val = preproc_res_ptr->min_values[j]; long long abs_val = std::abs(val); int bit_len = BitBufferUtils::bitsRequiredNoSign(abs_val);
+            BitBufferUtils::appendToBitstream(single_page_bitstream, bit_len, 8); BitBufferUtils::appendToBitstream(single_page_bitstream, (val >= 0 ? 0 : 1), 1); BitBufferUtils::appendToBitstream(single_page_bitstream, abs_val, bit_len);
+        }
+        for (int j = 0; j < dim; ++j) {
+            long long val = basepoint_res.min_base[j]; long long abs_val = std::abs(val); int bit_len = BitBufferUtils::bitsRequiredNoSign(abs_val);
+            BitBufferUtils::appendToBitstream(single_page_bitstream, bit_len, 8); BitBufferUtils::appendToBitstream(single_page_bitstream, (val >= 0 ? 0 : 1), 1); BitBufferUtils::appendToBitstream(single_page_bitstream, abs_val, bit_len);
+            BitBufferUtils::appendToBitstream(single_page_bitstream, basepoint_res.max_base_bit_len[j], 8);
+        }
+        BitBufferUtils::appendToBitstream(single_page_bitstream, (long long)bitstream_res.pack_res_metadata.size(), 16);
+        for (const auto& pack : bitstream_res.pack_res_metadata) for (int val : pack) BitBufferUtils::appendToBitstream(single_page_bitstream, val, 8);
+        single_page_bitstream.merge(basepoint_res.base_bitstream);
+        single_page_bitstream.merge(fre_bitstream);
+        single_page_bitstream.merge(bitstream_res.residual_bitstream);
+        
+        std::vector<uint8_t> compressed_bytes = single_page_bitstream.toByteArray();
+        uint32_t data_size = compressed_bytes.size();
+        out_stream.write_buf(reinterpret_cast<const char*>(&data_size), sizeof(data_size));
+        if (data_size > 0) {
+            out_stream.write_buf(reinterpret_cast<const char*>(compressed_bytes.data()), data_size);
+        }
+    } catch (const std::exception& e) {
+        reset();
+        return common::E_ENCODING_ERROR;
+    }
+    reset();
+    return common::E_OK;
+}
+
+inline int IntKClusterEncoder::flush(common::ByteStream &out_stream) {
+    if (points_buffer_.empty()) {
+        return common::E_OK;
+    }
+    try {
+        int dim = 1;
+        int pack_size = 10;
+        int block_size = 10;
+        
+        // CORRECTED: Call preprocessPageFromLongs for integer data
+        auto preproc_res_ptr = KClusterLogic::preprocessPageFromLongs(points_buffer_, dim);
+        if (!preproc_res_ptr || !preproc_res_ptr->data) {
+            reset();
+            return common::E_OK; 
+        }
+        
+        KClusterLogic::ClusteringResult clustering_res = KClusterLogic::kMedoidLogCost(*preproc_res_ptr->data, k_, 10, 1.0e-3, dim);
+        
+        using namespace KClusterLogic;
+        int page_k = clustering_res.medoids.size();
+        BitBuffer fre_bitstream = generateBitstreamFrequency(clustering_res.cluster_sizes, block_size);
+        std::vector<std::vector<long long>> residuals = residualCalculationZigzag_sorted(*preproc_res_ptr->data, clustering_res.medoids, clustering_res.cluster_assignment, clustering_res.cluster_sizes, dim);
+        
+        EncodedDataResult bitstream_res = generateBitstreamEncodedData(residuals, pack_size, dim);
+        BasePointsResult basepoint_res = generateBitstreamEncodedBasePointsOptimized(clustering_res.medoids, dim);
+
+        BitBuffer single_page_bitstream;
+        BitBufferUtils::appendToBitstream(single_page_bitstream, (long long)page_k, 16);
+        BitBufferUtils::appendToBitstream(single_page_bitstream, bitstream_res.total_data_points, 16);
+        for (int j = 0; j < dim; ++j) {
+            BitBufferUtils::appendToBitstream(single_page_bitstream, preproc_res_ptr->max_decimal_places[j], 8);
+            long long val = preproc_res_ptr->min_values[j]; long long abs_val = std::abs(val); int bit_len = BitBufferUtils::bitsRequiredNoSign(abs_val);
+            BitBufferUtils::appendToBitstream(single_page_bitstream, bit_len, 8); BitBufferUtils::appendToBitstream(single_page_bitstream, (val >= 0 ? 0 : 1), 1); BitBufferUtils::appendToBitstream(single_page_bitstream, abs_val, bit_len);
+        }
+        for (int j = 0; j < dim; ++j) {
+            long long val = basepoint_res.min_base[j]; long long abs_val = std::abs(val); int bit_len = BitBufferUtils::bitsRequiredNoSign(abs_val);
+            BitBufferUtils::appendToBitstream(single_page_bitstream, bit_len, 8); BitBufferUtils::appendToBitstream(single_page_bitstream, (val >= 0 ? 0 : 1), 1); BitBufferUtils::appendToBitstream(single_page_bitstream, abs_val, bit_len);
+            BitBufferUtils::appendToBitstream(single_page_bitstream, basepoint_res.max_base_bit_len[j], 8);
+        }
+        BitBufferUtils::appendToBitstream(single_page_bitstream, (long long)bitstream_res.pack_res_metadata.size(), 16);
+        for (const auto& pack : bitstream_res.pack_res_metadata) for (int val : pack) BitBufferUtils::appendToBitstream(single_page_bitstream, val, 8);
+        single_page_bitstream.merge(basepoint_res.base_bitstream);
+        single_page_bitstream.merge(fre_bitstream);
+        single_page_bitstream.merge(bitstream_res.residual_bitstream);
+        
+        std::vector<uint8_t> compressed_bytes = single_page_bitstream.toByteArray();
+        uint32_t data_size = compressed_bytes.size();
+        out_stream.write_buf(reinterpret_cast<const char*>(&data_size), sizeof(data_size));
+        if (data_size > 0) {
+            out_stream.write_buf(reinterpret_cast<const char*>(compressed_bytes.data()), data_size);
+        }
+    } catch (const std::exception& e) {
+        reset();
+        return common::E_ENCODING_ERROR;
+    }
+    reset();
+    return common::E_OK;
+}
+}
+
+#endif // ENCODING_KCLUSTER_ENCODER_H
\ No newline at end of file
diff --git a/cpp/test/encoding/cluster_codec_test.cc b/cpp/test/encoding/cluster_codec_test.cc
new file mode 100644
index 0000000..7126bc2
--- /dev/null
+++ b/cpp/test/encoding/cluster_codec_test.cc
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License a
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <vector>
+#include <cmath>
+#include <cstdlib>
+
+// Include your new encoder and decoder header files
+#include "encoding/kcluster_encoder.h"
+#include "encoding/kcluster_decoder.h"
+#include "encoding/acluster_encoder.h"
+#include "encoding/acluster_decoder.h"
+
+// Note: The original test uses raw new/delete. The framework might be using
+// a custom memory allocator via the factory. For a standalone test, raw new/delete is fine.
+// If integrated into the project's test runner, using the factory would be better.
+// We will follow the style of ts2diff_codec_test.cc.
+
+namespace storage {
+
+class ClusterCodecTest : public ::testing::Test {
+   protected:
+    void SetUp() override {
+        // KCluster instances
+        kcluster_encoder_double_ = new DoubleKClusterEncoder();
+        kcluster_decoder_double_ = new DoubleKClusterDecoder();
+        kcluster_encoder_int_ = new IntKClusterEncoder();
+        kcluster_decoder_int_ = new IntKClusterDecoder();
+        
+        // ACluster instances
+        acluster_encoder_double_ = new DoubleAClusterEncoder();
+        acluster_decoder_double_ = new DoubleAClusterDecoder();
+        acluster_encoder_int_ = new IntAClusterEncoder();
+        acluster_decoder_int_ = new IntAClusterDecoder();
+    }
+
+    void TearDown() override {
+        // KCluster
+        delete kcluster_encoder_double_;
+        delete kcluster_decoder_double_;
+        delete kcluster_encoder_int_;
+        delete kcluster_decoder_int_;
+
+        // ACluster
+        delete acluster_encoder_double_;
+        delete acluster_decoder_double_;
+        delete acluster_encoder_int_;
+        delete acluster_decoder_int_;
+    }
+
+    // KCluster pointers
+    DoubleKClusterEncoder* kcluster_encoder_double_;
+    DoubleKClusterDecoder* kcluster_decoder_double_;
+    IntKClusterEncoder* kcluster_encoder_int_;
+    IntKClusterDecoder* kcluster_decoder_int_;
+    
+    // ACluster pointers
+    DoubleAClusterEncoder* acluster_encoder_double_;
+    DoubleAClusterDecoder* acluster_decoder_double_;
+    IntAClusterEncoder* acluster_encoder_int_;
+    IntAClusterDecoder* acluster_decoder_int_;
+};
+
+
+// ===================================================================
+//   KCluster Tests
+// ===================================================================
+
+TEST_F(ClusterCodecTest, KClusterDoubleEncoding) {
+    common::ByteStream out_stream(1024, common::MOD_DEFAULT, false);
+    const int row_num = 1000; // Your algorithm is page-based, so let's test with a number of points that fits in a page.
+    std::vector<double> data(row_num);
+    
+    // Generate some test data, e.g., a sine wave with some noise
+    for (int i = 0; i < row_num; i++) {
+        data[i] = 100.0 * std::sin(i * 0.1) + (static_cast<double>(rand()) / RAND_MAX - 0.5);
+    }
+
+    for (int i = 0; i < row_num; i++) {
+        EXPECT_EQ(kcluster_encoder_double_->encode(data[i], out_stream), common::E_OK);
+    }
+    EXPECT_EQ(kcluster_encoder_double_->flush(out_stream), common::E_OK);
+
+    double x;
+    for (int i = 0; i < row_num; i++) {
+        EXPECT_EQ(kcluster_decoder_double_->read_double(x, out_stream), common::E_OK);
+        // Due to scaling and floating point precision, use ASSERT_NEAR for comparison
+        ASSERT_NEAR(x, data[i], 1e-9); 
+    }
+}
+
+TEST_F(ClusterCodecTest, KClusterIntEncoding) {
+    common::ByteStream out_stream(1024, common::MOD_DEFAULT, false);
+    const int row_num = 1000;
+    std::vector<int64_t> data(row_num);
+    
+    // Generate some integer data
+    for (int i = 0; i < row_num; i++) {
+        data[i] = static_cast<int64_t>(i * i) - (i * 10);
+    }
+
+    for (int i = 0; i < row_num; i++) {
+        EXPECT_EQ(kcluster_encoder_int_->encode(data[i], out_stream), common::E_OK);
+    }
+    EXPECT_EQ(kcluster_encoder_int_->flush(out_stream), common::E_OK);
+
+    int64_t x;
+    for (int i = 0; i < row_num; i++) {
+        EXPECT_EQ(kcluster_decoder_int_->read_int64(x, out_stream), common::E_OK);
+        EXPECT_EQ(x, data[i]);
+    }
+}
+
+
+// ===================================================================
+//   ACluster Tests
+// ===================================================================
+
+TEST_F(ClusterCodecTest, AClusterDoubleEncoding) {
+    common::ByteStream out_stream(1024, common::MOD_DEFAULT, false);
+    const int row_num = 1000;
+    std::vector<double> data(row_num);
+    
+    // Generate some test data, e.g., a linear slope with some periodic component
+    for (int i = 0; i < row_num; i++) {
+        data[i] = 0.5 * i + 50.0 * std::cos(i * 0.2);
+    }
+
+    for (int i = 0; i < row_num; i++) {
+        EXPECT_EQ(acluster_encoder_double_->encode(data[i], out_stream), common::E_OK);
+    }
+    EXPECT_EQ(acluster_encoder_double_->flush(out_stream), common::E_OK);
+
+    double x;
+    for (int i = 0; i < row_num; i++) {
+        EXPECT_EQ(acluster_decoder_double_->read_double(x, out_stream), common::E_OK);
+        ASSERT_NEAR(x, data[i], 1e-9);
+    }
+}
+
+TEST_F(ClusterCodecTest, AClusterIntEncoding) {
+    common::ByteStream out_stream(1024, common::MOD_DEFAULT, false);
+    const int row_num = 1000;
+    std::vector<int64_t> data(row_num);
+    
+    // Generate some different integer data
+    for (int i = 0; i < row_num; i++) {
+        data[i] = 10000 + (i % 50) * 100; // Data that should cluster well
+    }
+
+    for (int i = 0; i < row_num; i++) {
+        EXPECT_EQ(acluster_encoder_int_->encode(data[i], out_stream), common::E_OK);
+    }
+    EXPECT_EQ(acluster_encoder_int_->flush(out_stream), common::E_OK);
+
+    int64_t x;
+    for (int i = 0; i < row_num; i++) {
+        EXPECT_EQ(acluster_decoder_int_->read_int64(x, out_stream), common::E_OK);
+        EXPECT_EQ(x, data[i]);
+    }
+}
+
+TEST_F(ClusterCodecTest, KClusterFloatAsDoubleEncoding) {
+    common::ByteStream out_stream(1024, common::MOD_DEFAULT, false);
+    const int row_num = 500;
+    std::vector<float> data(row_num);
+    
+    for (int i = 0; i < row_num; i++) {
+        data[i] = 123.45f + static_cast<float>(i);
+    }
+
+    for (int i = 0; i < row_num; i++) {
+        EXPECT_EQ(kcluster_encoder_double_->encode(data[i], out_stream), common::E_OK);
+    }
+    EXPECT_EQ(kcluster_encoder_double_->flush(out_stream), common::E_OK);
+
+    float x;
+    for (int i = 0; i < row_num; i++) {
+        EXPECT_EQ(kcluster_decoder_double_->read_float(x, out_stream), common::E_OK);
+        ASSERT_NEAR(x, data[i], 1e-6);
+    }
+}
+
+TEST_F(ClusterCodecTest, AClusterInt32AsInt64Encoding) {
+    common::ByteStream out_stream(1024, common::MOD_DEFAULT, false);
+    const int row_num = 500;
+    std::vector<int32_t> data(row_num);
+    
+    for (int i = 0; i < row_num; i++) {
+        data[i] = 500 - i;
+    }
+
+    for (int i = 0; i < row_num; i++) {
+        EXPECT_EQ(acluster_encoder_int_->encode(data[i], out_stream), common::E_OK);
+    }
+    EXPECT_EQ(acluster_encoder_int_->flush(out_stream), common::E_OK);
+
+    int32_t x;
+    for (int i = 0; i < row_num; i++) {
+        EXPECT_EQ(acluster_decoder_int_->read_int32(x, out_stream), common::E_OK);
+        EXPECT_EQ(x, data[i]);
+    }
+}
+
+}  // namespace storage
\ No newline at end of file
diff --git a/java/pom.xml b/java/pom.xml
index b05a920..8c6be19 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -56,7 +56,7 @@
             <dependency>
                 <groupId>org.apache.commons</groupId>
                 <artifactId>commons-lang3</artifactId>
-                <version>3.16.0</version>
+                <version>3.15.0</version>
             </dependency>
             <dependency>
                 <groupId>org.lz4</groupId>
@@ -99,7 +99,7 @@
             <dependency>
                 <groupId>com.google.code.gson</groupId>
                 <artifactId>gson</artifactId>
-                <version>2.11.0</version>
+                <version>2.10.1</version>
             </dependency>
         </dependencies>
     </dependencyManagement>
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/ClusterDecoder.java b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/ClusterDecoder.java
index 7047984..59a914f 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/ClusterDecoder.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/ClusterDecoder.java
@@ -1,7 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.tsfile.encoding.decoder;
 
 import org.apache.tsfile.enums.TSDataType;
-import org.apache.tsfile.exception.encoding.TsFileDecodingException;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
 
 import java.io.IOException;
@@ -9,207 +27,199 @@
 
 public class ClusterDecoder extends Decoder {
 
-    private final TSDataType dataType;
+  private final TSDataType dataType;
 
+  private long[] longValues;
+  private double[] doubleValues;
+  private int readIndex = 0;
+  private int count = 0;
+  private boolean hasDecoded = false;
 
-    private long[] longValues;
-    private double[] doubleValues;
-    private int readIndex = 0;
-    private int count = 0;
-    private boolean hasDecoded = false;
+  public ClusterDecoder(TSDataType dataType) {
+    super(TSEncoding.ACLUSTER);
+    this.dataType = dataType;
+  }
 
-    public ClusterDecoder(TSDataType dataType) {
-        super(TSEncoding.ACLUSTER);
-        this.dataType = dataType;
+  /** check has next */
+  @Override
+  public boolean hasNext(ByteBuffer buffer) throws IOException {
+    if (!hasDecoded) {
+      decodeInternally(buffer);
+    }
+    return readIndex < count;
+  }
+
+  /** reset */
+  @Override
+  public void reset() {
+    this.hasDecoded = false;
+    this.readIndex = 0;
+    this.count = 0;
+    this.longValues = null;
+    this.doubleValues = null;
+  }
+
+  @Override
+  public int readInt(ByteBuffer buffer) {
+    return (int) longValues[readIndex++];
+  }
+
+  @Override
+  public long readLong(ByteBuffer buffer) {
+    return longValues[readIndex++];
+  }
+
+  @Override
+  public float readFloat(ByteBuffer buffer) {
+    return (float) doubleValues[readIndex++];
+  }
+
+  @Override
+  public double readDouble(ByteBuffer buffer) {
+    return doubleValues[readIndex++];
+  }
+
+  /** Internal decode method Decode ACluster */
+  private void decodeInternally(ByteBuffer buffer) {
+    if (hasDecoded) {
+      return;
     }
 
-    /**
-     * check has next
-     */
-    @Override
-    public boolean hasNext(ByteBuffer buffer) throws IOException {
-        if (!hasDecoded) {
-            decodeInternally(buffer);
-        }
-        return readIndex < count;
+    ClusterReader reader = new ClusterReader(buffer);
+
+    // --- Header ---
+    int scalingExponent = (int) reader.read(8);
+    int k = (int) reader.read(16);
+    this.count = (int) reader.read(16);
+    int packSize = (int) reader.read(16);
+
+    // --- Global Minimum Value (minVal) ---
+    int minValBit = (int) reader.read(8);
+    long minValSign = reader.read(1);
+    long absMinVal = reader.read(minValBit); // Read the absolute value part
+    long minVal = (minValSign == 1) ? -absMinVal : absMinVal; // Apply the sign
+
+    // Allocate memory based on count and dataType
+    if (this.count > 0) {
+      if (dataType == TSDataType.FLOAT || dataType == TSDataType.DOUBLE) {
+        this.doubleValues = new double[this.count];
+      } else {
+        this.longValues = new long[this.count];
+      }
     }
 
-    /**
-     * reset
-     */
-    @Override
-    public void reset() {
-        this.hasDecoded = false;
-        this.readIndex = 0;
-        this.count = 0;
-        this.longValues = null;
-        this.doubleValues = null;
+    // Handle case where all values are the same (k=0)
+    if (k == 0) {
+      if (this.count > 0) {
+        reconstructFromMinValOnly(minVal, scalingExponent);
+      }
+      this.hasDecoded = true;
+      return;
     }
 
-    @Override
-    public int readInt(ByteBuffer buffer){
-        return (int) longValues[readIndex++];
+    // --- Medoids ---
+    long[] medoids = new long[k];
+    int minMedoidBit = (int) reader.read(8);
+    long minMedoidSign = reader.read(1);
+    long absMinMedoid = reader.read(minMedoidBit); // Read the absolute value part
+    long minMedoid = (minMedoidSign == 1) ? -absMinMedoid : absMinMedoid; // Apply the sign
+
+    int maxMedoidOffsetBits = (int) reader.read(8);
+    for (int i = 0; i < k; i++) {
+      long offset = reader.read(maxMedoidOffsetBits);
+      medoids[i] = minMedoid + offset;
     }
 
-    @Override
-    public long readLong(ByteBuffer buffer){
-        return longValues[readIndex++];
+    // --- Frequencies (Cluster Sizes) ---
+    // The encoder wrote deltas (cluster sizes), so we read them and rebuild the cumulative array
+    long[] cumulativeFrequencies = new long[k];
+    int numFreqBlocks = (int) reader.read(16);
+
+    // Metadata pass for frequencies
+    int[] freqBlockMaxBits = new int[numFreqBlocks];
+    for (int i = 0; i < numFreqBlocks; i++) {
+      freqBlockMaxBits[i] = (int) reader.read(8);
     }
 
-    @Override
-    public float readFloat(ByteBuffer buffer){
-        return (float) doubleValues[readIndex++];
+    // Data pass for frequencies - Reconstruct cumulative frequencies
+    long currentCumulativeFreq = 0;
+    int freqIndex = 0;
+    for (int i = 0; i < numFreqBlocks; i++) {
+      int start = i * packSize;
+      int end = Math.min(start + packSize, k);
+      int bitsForBlock = freqBlockMaxBits[i];
+      for (int j = start; j < end; j++) {
+        long delta = reader.read(bitsForBlock); // This delta is the actual cluster size
+        currentCumulativeFreq += delta;
+        cumulativeFrequencies[freqIndex++] = currentCumulativeFreq;
+      }
     }
 
-    @Override
-    public double readDouble(ByteBuffer buffer){
-        return doubleValues[readIndex++];
+    // --- Residuals ---
+    long[] residuals = new long[this.count];
+    int numPacks = (int) reader.read(32);
+
+    // Metadata pass for residuals
+    int[] resPackMaxBits = new int[numPacks];
+    for (int i = 0; i < numPacks; i++) {
+      resPackMaxBits[i] = (int) reader.read(8);
     }
 
-
-    /**
-     * Internal decode method
-     * Decode ACluster
-     */
-    private void decodeInternally(ByteBuffer buffer) {
-        if (hasDecoded) {
-            return;
+    // Data pass for residuals
+    int residualIdx = 0;
+    for (int i = 0; i < numPacks; i++) {
+      int start = i * packSize;
+      int end = Math.min(start + packSize, this.count);
+      int bitsForPack = resPackMaxBits[i];
+      if (bitsForPack > 0) {
+        for (int j = start; j < end; j++) {
+          residuals[residualIdx++] = reader.read(bitsForPack);
         }
-
-        ClusterReader reader = new ClusterReader(buffer);
-
-        // --- Header ---
-        int scalingExponent = (int) reader.read(8);
-        int k = (int) reader.read(16);
-        this.count = (int) reader.read(16);
-        int packSize = (int) reader.read(16);
-
-        // --- Global Minimum Value (minVal) ---
-        int minValBit = (int) reader.read(8);
-        long minValSign = reader.read(1);
-        long absMinVal = reader.read(minValBit); // Read the absolute value part
-        long minVal = (minValSign == 1) ? -absMinVal : absMinVal; // Apply the sign
-
-        // Allocate memory based on count and dataType
-        if (this.count > 0) {
-            if (dataType == TSDataType.FLOAT || dataType == TSDataType.DOUBLE) {
-                this.doubleValues = new double[this.count];
-            } else {
-                this.longValues = new long[this.count];
-            }
-        }
-
-        // Handle case where all values are the same (k=0)
-        if (k == 0) {
-            if (this.count > 0) {
-                reconstructFromMinValOnly(minVal, scalingExponent);
-            }
-            this.hasDecoded = true;
-            return;
-        }
-
-        // --- Medoids ---
-        long[] medoids = new long[k];
-        int minMedoidBit = (int) reader.read(8);
-        long minMedoidSign = reader.read(1);
-        long absMinMedoid = reader.read(minMedoidBit); // Read the absolute value part
-        long minMedoid = (minMedoidSign == 1) ? -absMinMedoid : absMinMedoid; // Apply the sign
-
-        int maxMedoidOffsetBits = (int) reader.read(8);
-        for (int i = 0; i < k; i++) {
-            long offset = reader.read(maxMedoidOffsetBits);
-            medoids[i] = minMedoid + offset;
-        }
-
-        // --- Frequencies (Cluster Sizes) ---
-        // The encoder wrote deltas (cluster sizes), so we read them and rebuild the cumulative array
-        long[] cumulativeFrequencies = new long[k];
-        int numFreqBlocks = (int) reader.read(16);
-
-        // Metadata pass for frequencies
-        int[] freqBlockMaxBits = new int[numFreqBlocks];
-        for (int i = 0; i < numFreqBlocks; i++) {
-            freqBlockMaxBits[i] = (int) reader.read(8);
-        }
-
-        // Data pass for frequencies - Reconstruct cumulative frequencies
-        long currentCumulativeFreq = 0;
-        int freqIndex = 0;
-        for (int i = 0; i < numFreqBlocks; i++) {
-            int start = i * packSize;
-            int end = Math.min(start + packSize, k);
-            int bitsForBlock = freqBlockMaxBits[i];
-            for (int j = start; j < end; j++) {
-                long delta = reader.read(bitsForBlock); // This delta is the actual cluster size
-                currentCumulativeFreq += delta;
-                cumulativeFrequencies[freqIndex++] = currentCumulativeFreq;
-            }
-        }
-
-        // --- Residuals ---
-        long[] residuals = new long[this.count];
-        int numPacks = (int) reader.read(32);
-
-        // Metadata pass for residuals
-        int[] resPackMaxBits = new int[numPacks];
-        for (int i = 0; i < numPacks; i++) {
-            resPackMaxBits[i] = (int) reader.read(8);
-        }
-
-        // Data pass for residuals
-        int residualIdx = 0;
-        for (int i = 0; i < numPacks; i++) {
-            int start = i * packSize;
-            int end = Math.min(start + packSize, this.count);
-            int bitsForPack = resPackMaxBits[i];
-            if (bitsForPack > 0) {
-                for (int j = start; j < end; j++) {
-                    residuals[residualIdx++] = reader.read(bitsForPack);
-                }
-            } else {
-                // If bitsForPack is 0, all residuals in this pack are 0.
-                // We just need to advance the index, as the array is already initialized to 0.
-                residualIdx += (end - start);
-            }
-        }
-
-        // --- Final Data Reconstruction ---
-        // Use the correctly reconstructed cumulativeFrequencies array
-        reconstructData(medoids, cumulativeFrequencies, residuals, minVal, scalingExponent);
-
-        this.hasDecoded = true;
+      } else {
+        // If bitsForPack is 0, all residuals in this pack are 0.
+        // We just need to advance the index, as the array is already initialized to 0.
+        residualIdx += (end - start);
+      }
     }
 
-    private void reconstructData(long[] medoids, long[] frequencies, long[] residuals, long minVal, int scalingExponent) {
-        int residualReadPos = 0;
-        int dataWritePos = 0;
-        double scalingFactor = Math.pow(10, scalingExponent);
+    // --- Final Data Reconstruction ---
+    // Use the correctly reconstructed cumulativeFrequencies array
+    reconstructData(medoids, cumulativeFrequencies, residuals, minVal, scalingExponent);
 
-        for (int clusterId = 0; clusterId < medoids.length; clusterId++) {
-            long pointsInThisCluster = frequencies[clusterId];
-            long medoid = medoids[clusterId];
+    this.hasDecoded = true;
+  }
 
-            for (int i = 0; i < pointsInThisCluster; i++) {
-                long zigzagResidual = residuals[residualReadPos++];
-                long residual = (zigzagResidual >>> 1) ^ -(zigzagResidual & 1);
-                long scaledDataPoint = medoid + residual + minVal;
+  private void reconstructData(
+      long[] medoids, long[] frequencies, long[] residuals, long minVal, int scalingExponent) {
+    int residualReadPos = 0;
+    int dataWritePos = 0;
+    double scalingFactor = Math.pow(10, scalingExponent);
 
-                if (dataType == TSDataType.FLOAT || dataType == TSDataType.DOUBLE) {
-                    doubleValues[dataWritePos++] = scaledDataPoint / scalingFactor;
-                } else {
-                    longValues[dataWritePos++] = scaledDataPoint;
-                }
-            }
-        }
-    }
+    for (int clusterId = 0; clusterId < medoids.length; clusterId++) {
+      long pointsInThisCluster = frequencies[clusterId];
+      long medoid = medoids[clusterId];
 
-    private void reconstructFromMinValOnly(long minVal, int scalingExponent) {
+      for (int i = 0; i < pointsInThisCluster; i++) {
+        long zigzagResidual = residuals[residualReadPos++];
+        long residual = (zigzagResidual >>> 1) ^ -(zigzagResidual & 1);
+        long scaledDataPoint = medoid + residual + minVal;
+
         if (dataType == TSDataType.FLOAT || dataType == TSDataType.DOUBLE) {
-            double scalingFactor = Math.pow(10, scalingExponent);
-            double finalValue = minVal / scalingFactor;
-            for(int i=0; i<this.count; i++) doubleValues[i] = finalValue;
+          doubleValues[dataWritePos++] = scaledDataPoint / scalingFactor;
         } else {
-            for(int i=0; i<this.count; i++) longValues[i] = minVal;
+          longValues[dataWritePos++] = scaledDataPoint;
         }
+      }
     }
+  }
+
+  private void reconstructFromMinValOnly(long minVal, int scalingExponent) {
+    if (dataType == TSDataType.FLOAT || dataType == TSDataType.DOUBLE) {
+      double scalingFactor = Math.pow(10, scalingExponent);
+      double finalValue = minVal / scalingFactor;
+      for (int i = 0; i < this.count; i++) doubleValues[i] = finalValue;
+    } else {
+      for (int i = 0; i < this.count; i++) longValues[i] = minVal;
+    }
+  }
 }
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/ClusterReader.java b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/ClusterReader.java
index ad578ad..99ebb66 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/ClusterReader.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/ClusterReader.java
@@ -1,35 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.tsfile.encoding.decoder;
 
 import java.nio.ByteBuffer;
 
 public class ClusterReader {
-    private final ByteBuffer buffer;
-    private byte currentByte;
-    private int bitPosition; // from 7 down to 0
+  private final ByteBuffer buffer;
+  private byte currentByte;
+  private int bitPosition; // from 7 down to 0
 
-    public ClusterReader(ByteBuffer buffer) {
-        this.buffer = buffer;
-        this.currentByte = 0;
-        this.bitPosition = -1; // Start at -1 to force reading a new byte first
+  public ClusterReader(ByteBuffer buffer) {
+    this.buffer = buffer;
+    this.currentByte = 0;
+    this.bitPosition = -1; // Start at -1 to force reading a new byte first
+  }
+
+  public long read(int numBits) {
+    if (numBits > 64 || numBits <= 0) {
+      throw new IllegalArgumentException(
+          "Cannot read more than 64 bits or non-positive bits at once.");
     }
 
-    public long read(int numBits) {
-        if (numBits > 64 || numBits <= 0) {
-            throw new IllegalArgumentException("Cannot read more than 64 bits or non-positive bits at once.");
-        }
-
-        long result = 0;
-        for (int i = 0; i < numBits; i++) {
-            if (bitPosition < 0) {
-                currentByte = buffer.get();
-                bitPosition = 7;
-            }
-            // Read the bit at the current position
-            long bit = (currentByte >> bitPosition) & 1;
-            // Shift the result and add the new bit
-            result = (result << 1) | bit;
-            bitPosition--;
-        }
-        return result;
+    long result = 0;
+    for (int i = 0; i < numBits; i++) {
+      if (bitPosition < 0) {
+        currentByte = buffer.get();
+        bitPosition = 7;
+      }
+      // Read the bit at the current position
+      long bit = (currentByte >> bitPosition) & 1;
+      // Shift the result and add the new bit
+      result = (result << 1) | bit;
+      bitPosition--;
     }
-}
\ No newline at end of file
+    return result;
+  }
+}
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/Decoder.java b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/Decoder.java
index bc61bda..9f3a724 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/Decoder.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/Decoder.java
@@ -177,14 +177,18 @@
           default:
             throw new TsFileDecodingException(String.format(ERROR_MSG, encoding, dataType));
         }
-      case ACLUSTER: case KCLUSTER:
-        switch (dataType){
-          case INT32: case INT64: case FLOAT: case DOUBLE:
+      case ACLUSTER:
+      case KCLUSTER:
+        switch (dataType) {
+          case INT32:
+          case INT64:
+          case FLOAT:
+          case DOUBLE:
             return new ClusterDecoder(dataType);
           default:
             throw new TsFileDecodingException(String.format(ERROR_MSG, encoding, dataType));
         }
-        default:
+      default:
         throw new TsFileDecodingException(String.format(ERROR_MSG, encoding, dataType));
     }
   }
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/AClusterAlgorithm.java b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/AClusterAlgorithm.java
index 44d4d91..f4e079f 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/AClusterAlgorithm.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/AClusterAlgorithm.java
@@ -24,190 +24,185 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
 
 public final class AClusterAlgorithm {
 
-    /**
-     * Private constructor to prevent instantiation of this utility class.
-     */
-    private AClusterAlgorithm() {}
+  /** Private constructor to prevent instantiation of this utility class. */
+  private AClusterAlgorithm() {}
 
-    /**
-     * The main entry point for the ACluster algorithm.
-     * It processes a page of data and returns the results as an Object array.
-     *
-     * @param data The input time series data for a single page, represented as a long array.
-     * @return An Object array where: <br>
-     *         - index 0: long[] medoids (sorted by cluster frequency) <br>
-     *         - index 1: int[] clusterAssignments (mapped to the sorted medoids) <br>
-     *         - index 2: long[] clusterFrequencies (sorted)
-     */
-    public static Object[] run(long[] data) {
-        int n = data.length;
-        if (n == 0) {
-            return new Object[]{new long[0], new int[0], new long[0]};
-        }
-
-        // --- Initialization ---
-        List<Long> medoids = new ArrayList<>();
-        Set<Long> existingMedoids = new HashSet<>();
-        List<Set<Integer>> pointsInClusters = new ArrayList<>();
-        int[] pointToMedoidMap = new int[n];
-
-        // --- Step 1: Initialize with the first data point ---
-        long firstPoint = data[0];
-        medoids.add(firstPoint);
-        existingMedoids.add(firstPoint);
-        Set<Integer> firstCluster = new HashSet<>();
-        firstCluster.add(0);
-        pointsInClusters.add(firstCluster);
-        pointToMedoidMap[0] = 0;
-
-        // --- Step 2: Iteratively process the rest of the points ---
-        for (int i = 1; i < n; i++) {
-            long currentPoint = data[i];
-
-            if (existingMedoids.contains(currentPoint)) {
-                int medoidIndex = medoids.indexOf(currentPoint);
-                pointsInClusters.get(medoidIndex).add(i);
-                pointToMedoidMap[i] = medoidIndex;
-                continue;
-            }
-
-            // --- Step 3: Find the best existing medoid ---
-            int bestMedoidIndex = -1;
-            long minCostToExistingMedoid = Long.MAX_VALUE;
-            for (int j = 0; j < medoids.size(); j++) {
-                long cost = calculateResidualCost(currentPoint, medoids.get(j));
-                if (cost < minCostToExistingMedoid) {
-                    minCostToExistingMedoid = cost;
-                    bestMedoidIndex = j;
-                }
-            }
-
-            // --- Step 4: Calculate potential savings ---
-            long savingsFromCurrentPoint = minCostToExistingMedoid;
-            long savingsFromReassignment = 0;
-            Set<Integer> pointsInBestCluster = pointsInClusters.get(bestMedoidIndex);
-            for (int pointIndexInCluster : pointsInBestCluster) {
-                long p = data[pointIndexInCluster];
-                long costToOldMedoid = calculateResidualCost(p, medoids.get(bestMedoidIndex));
-                long costToNewPotentialMedoid = calculateResidualCost(p, currentPoint);
-                if (costToNewPotentialMedoid < costToOldMedoid) {
-                    savingsFromReassignment += (costToOldMedoid - costToNewPotentialMedoid);
-                }
-            }
-            long totalSavings = savingsFromCurrentPoint + savingsFromReassignment;
-
-            // --- Step 5: Make the decision ---
-            long storageCostForNewPoint = calculateBasePointStorageCost(currentPoint);
-            if (totalSavings > storageCostForNewPoint) {
-                // Decision: Create a new medoid.
-                int newMedoidId = medoids.size();
-                medoids.add(currentPoint);
-                existingMedoids.add(currentPoint);
-                Set<Integer> newCluster = new HashSet<>();
-                newCluster.add(i);
-                pointsInClusters.add(newCluster);
-                pointToMedoidMap[i] = newMedoidId;
-
-                Set<Integer> pointsToReEvaluate = new HashSet<>(pointsInClusters.get(bestMedoidIndex));
-                for (int pointIndexToReEvaluate : pointsToReEvaluate) {
-                    long p = data[pointIndexToReEvaluate];
-                    if (calculateResidualCost(p, currentPoint) < calculateResidualCost(p, medoids.get(bestMedoidIndex))) {
-                        pointsInClusters.get(bestMedoidIndex).remove(pointIndexToReEvaluate);
-                        pointsInClusters.get(newMedoidId).add(pointIndexToReEvaluate);
-                        pointToMedoidMap[pointIndexToReEvaluate] = newMedoidId;
-                    }
-                }
-            } else {
-                // Decision: Assign to the existing best medoid.
-                pointsInClusters.get(bestMedoidIndex).add(i);
-                pointToMedoidMap[i] = bestMedoidIndex;
-            }
-        }
-
-        // --- Step 6: Finalize and sort the results ---
-        int k = medoids.size();
-        long[] finalMedoids = medoids.stream().mapToLong(l -> l).toArray();
-        long[] rawClusterSizes = new long[k];
-        for (int i = 0; i < k; i++) {
-            rawClusterSizes[i] = pointsInClusters.get(i).size();
-        }
-
-        return sortResults(finalMedoids, pointToMedoidMap, rawClusterSizes);
+  /**
+   * The main entry point for the ACluster algorithm. It processes a page of data and returns the
+   * results as an Object array.
+   *
+   * @param data The input time series data for a single page, represented as a long array.
+   * @return An Object array where: <br>
+   *     - index 0: long[] medoids (sorted by cluster frequency) <br>
+   *     - index 1: int[] clusterAssignments (mapped to the sorted medoids) <br>
+   *     - index 2: long[] clusterFrequencies (sorted)
+   */
+  public static Object[] run(long[] data) {
+    int n = data.length;
+    if (n == 0) {
+      return new Object[] {new long[0], new int[0], new long[0]};
     }
 
-    /**
-     * Helper class for sorting medoids based on their cluster size (frequency).
-     */
-    private static class MedoidSortHelper implements Comparable<MedoidSortHelper> {
-        long medoid;
-        long size;
-        int originalIndex;
+    // --- Initialization ---
+    List<Long> medoids = new ArrayList<>();
+    Set<Long> existingMedoids = new HashSet<>();
+    List<Set<Integer>> pointsInClusters = new ArrayList<>();
+    int[] pointToMedoidMap = new int[n];
 
-        MedoidSortHelper(long medoid, long size, int originalIndex) {
-            this.medoid = medoid;
-            this.size = size;
-            this.originalIndex = originalIndex;
-        }
+    // --- Step 1: Initialize with the first data point ---
+    long firstPoint = data[0];
+    medoids.add(firstPoint);
+    existingMedoids.add(firstPoint);
+    Set<Integer> firstCluster = new HashSet<>();
+    firstCluster.add(0);
+    pointsInClusters.add(firstCluster);
+    pointToMedoidMap[0] = 0;
 
-        @Override
-        public int compareTo(MedoidSortHelper other) {
-            return Long.compare(this.size, other.size);
+    // --- Step 2: Iteratively process the rest of the points ---
+    for (int i = 1; i < n; i++) {
+      long currentPoint = data[i];
+
+      if (existingMedoids.contains(currentPoint)) {
+        int medoidIndex = medoids.indexOf(currentPoint);
+        pointsInClusters.get(medoidIndex).add(i);
+        pointToMedoidMap[i] = medoidIndex;
+        continue;
+      }
+
+      // --- Step 3: Find the best existing medoid ---
+      int bestMedoidIndex = -1;
+      long minCostToExistingMedoid = Long.MAX_VALUE;
+      for (int j = 0; j < medoids.size(); j++) {
+        long cost = calculateResidualCost(currentPoint, medoids.get(j));
+        if (cost < minCostToExistingMedoid) {
+          minCostToExistingMedoid = cost;
+          bestMedoidIndex = j;
         }
+      }
+
+      // --- Step 4: Calculate potential savings ---
+      long savingsFromCurrentPoint = minCostToExistingMedoid;
+      long savingsFromReassignment = 0;
+      Set<Integer> pointsInBestCluster = pointsInClusters.get(bestMedoidIndex);
+      for (int pointIndexInCluster : pointsInBestCluster) {
+        long p = data[pointIndexInCluster];
+        long costToOldMedoid = calculateResidualCost(p, medoids.get(bestMedoidIndex));
+        long costToNewPotentialMedoid = calculateResidualCost(p, currentPoint);
+        if (costToNewPotentialMedoid < costToOldMedoid) {
+          savingsFromReassignment += (costToOldMedoid - costToNewPotentialMedoid);
+        }
+      }
+      long totalSavings = savingsFromCurrentPoint + savingsFromReassignment;
+
+      // --- Step 5: Make the decision ---
+      long storageCostForNewPoint = calculateBasePointStorageCost(currentPoint);
+      if (totalSavings > storageCostForNewPoint) {
+        // Decision: Create a new medoid.
+        int newMedoidId = medoids.size();
+        medoids.add(currentPoint);
+        existingMedoids.add(currentPoint);
+        Set<Integer> newCluster = new HashSet<>();
+        newCluster.add(i);
+        pointsInClusters.add(newCluster);
+        pointToMedoidMap[i] = newMedoidId;
+
+        Set<Integer> pointsToReEvaluate = new HashSet<>(pointsInClusters.get(bestMedoidIndex));
+        for (int pointIndexToReEvaluate : pointsToReEvaluate) {
+          long p = data[pointIndexToReEvaluate];
+          if (calculateResidualCost(p, currentPoint)
+              < calculateResidualCost(p, medoids.get(bestMedoidIndex))) {
+            pointsInClusters.get(bestMedoidIndex).remove(pointIndexToReEvaluate);
+            pointsInClusters.get(newMedoidId).add(pointIndexToReEvaluate);
+            pointToMedoidMap[pointIndexToReEvaluate] = newMedoidId;
+          }
+        }
+      } else {
+        // Decision: Assign to the existing best medoid.
+        pointsInClusters.get(bestMedoidIndex).add(i);
+        pointToMedoidMap[i] = bestMedoidIndex;
+      }
     }
 
-    /**
-     * Sorts the final medoids and their cluster information based on cluster frequency.
-     *
-     * @param medoids The discovered medoids.
-     * @param clusterAssignment The assignment map for each data point.
-     * @param clusterSize The frequency of each cluster.
-     * @return A sorted and correctly mapped Object array.
-     */
-    private static Object[] sortResults(long[] medoids, int[] clusterAssignment, long[] clusterSize) {
-        int k = medoids.length;
-        List<MedoidSortHelper> sorters = new ArrayList<>();
-        for (int i = 0; i < k; i++) {
-            sorters.add(new MedoidSortHelper(medoids[i], clusterSize[i], i));
-        }
-        Collections.sort(sorters);
-
-        long[] sortedMedoids = new long[k];
-        long[] sortedClusterSize = new long[k];
-        int[] oldToNewIndexMap = new int[k];
-
-        for (int i = 0; i < k; i++) {
-            MedoidSortHelper sortedItem = sorters.get(i);
-            sortedMedoids[i] = sortedItem.medoid;
-            sortedClusterSize[i] = sortedItem.size;
-            oldToNewIndexMap[sortedItem.originalIndex] = i;
-        }
-
-        int[] sortedClusterAssignment = new int[clusterAssignment.length];
-        for (int i = 0; i < clusterAssignment.length; i++) {
-            int oldIndex = clusterAssignment[i];
-            sortedClusterAssignment[i] = oldToNewIndexMap[oldIndex];
-        }
-
-        return new Object[]{sortedMedoids, sortedClusterAssignment, sortedClusterSize};
+    // --- Step 6: Finalize and sort the results ---
+    int k = medoids.size();
+    long[] finalMedoids = medoids.stream().mapToLong(l -> l).toArray();
+    long[] rawClusterSizes = new long[k];
+    for (int i = 0; i < k; i++) {
+      rawClusterSizes[i] = pointsInClusters.get(i).size();
     }
 
-    // --- Cost Calculation Functions ---
+    return sortResults(finalMedoids, pointToMedoidMap, rawClusterSizes);
+  }
 
-    private static long bitLengthCost(long value) {
-        if (value == 0) return 1;
-        return 64 - Long.numberOfLeadingZeros(value);
+  /** Helper class for sorting medoids based on their cluster size (frequency). */
+  private static class MedoidSortHelper implements Comparable<MedoidSortHelper> {
+    long medoid;
+    long size;
+    int originalIndex;
+
+    MedoidSortHelper(long medoid, long size, int originalIndex) {
+      this.medoid = medoid;
+      this.size = size;
+      this.originalIndex = originalIndex;
     }
 
-    private static long calculateResidualCost(long p1, long p2) {
-        return 1 + bitLengthCost(Math.abs(p1 - p2));
+    @Override
+    public int compareTo(MedoidSortHelper other) {
+      return Long.compare(this.size, other.size);
+    }
+  }
+
+  /**
+   * Sorts the final medoids and their cluster information based on cluster frequency.
+   *
+   * @param medoids The discovered medoids.
+   * @param clusterAssignment The assignment map for each data point.
+   * @param clusterSize The frequency of each cluster.
+   * @return A sorted and correctly mapped Object array.
+   */
+  private static Object[] sortResults(long[] medoids, int[] clusterAssignment, long[] clusterSize) {
+    int k = medoids.length;
+    List<MedoidSortHelper> sorters = new ArrayList<>();
+    for (int i = 0; i < k; i++) {
+      sorters.add(new MedoidSortHelper(medoids[i], clusterSize[i], i));
+    }
+    Collections.sort(sorters);
+
+    long[] sortedMedoids = new long[k];
+    long[] sortedClusterSize = new long[k];
+    int[] oldToNewIndexMap = new int[k];
+
+    for (int i = 0; i < k; i++) {
+      MedoidSortHelper sortedItem = sorters.get(i);
+      sortedMedoids[i] = sortedItem.medoid;
+      sortedClusterSize[i] = sortedItem.size;
+      oldToNewIndexMap[sortedItem.originalIndex] = i;
     }
 
-    private static long calculateBasePointStorageCost(long basePoint) {
-        return 1 + bitLengthCost(Math.abs(basePoint));
+    int[] sortedClusterAssignment = new int[clusterAssignment.length];
+    for (int i = 0; i < clusterAssignment.length; i++) {
+      int oldIndex = clusterAssignment[i];
+      sortedClusterAssignment[i] = oldToNewIndexMap[oldIndex];
     }
+
+    return new Object[] {sortedMedoids, sortedClusterAssignment, sortedClusterSize};
+  }
+
+  // --- Cost Calculation Functions ---
+
+  private static long bitLengthCost(long value) {
+    if (value == 0) return 1;
+    return 64 - Long.numberOfLeadingZeros(value);
+  }
+
+  private static long calculateResidualCost(long p1, long p2) {
+    return 1 + bitLengthCost(Math.abs(p1 - p2));
+  }
+
+  private static long calculateBasePointStorageCost(long basePoint) {
+    return 1 + bitLengthCost(Math.abs(basePoint));
+  }
 }
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/AClusterEncoder.java b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/AClusterEncoder.java
index 8e2ca0b..1cea597 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/AClusterEncoder.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/AClusterEncoder.java
@@ -19,8 +19,8 @@
 
 package org.apache.tsfile.encoding.encoder;
 
-import org.apache.tsfile.exception.encoding.TsFileEncodingException;
 import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.encoding.TsFileEncodingException;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.tsfile.utils.Binary;
 
@@ -37,386 +37,430 @@
  */
 public class AClusterEncoder extends Encoder {
 
-    private static final int DEFAULT_PACK_SIZE = 10;
+  private static final int DEFAULT_PACK_SIZE = 10;
 
+  /** A buffer to store all values of a page before flushing. We use long to unify all types. */
+  private interface ValueBuffer {
+    /** Adds a value to the buffer. */
+    void add(int value);
 
-    /** A buffer to store all values of a page before flushing. We use long to unify all types. */
-    private interface ValueBuffer {
-        /** Adds a value to the buffer. */
-        void add(int value);
-        void add(long value);
-        void add(float value);
-        void add(double value);
+    void add(long value);
 
-        /** Clears the internal buffer. */
-        void clear();
+    void add(float value);
 
-        /** Checks if the buffer is empty. */
-        boolean isEmpty();
+    void add(double value);
 
-        /** Gets the current size of the buffer. */
-        int size();
+    /** Clears the internal buffer. */
+    void clear();
 
-        ProcessingResult processAndGet();
-    }
+    /** Checks if the buffer is empty. */
+    boolean isEmpty();
 
-    private static class Int64Buffer implements ValueBuffer {
-        private final List<Long> values = new ArrayList<>();
+    /** Gets the current size of the buffer. */
+    int size();
 
-        @Override
-        public void add(int value) { values.add((long) value); }
-        @Override
-        public void add(long value) { values.add(value); }
-        @Override
-        public void add(float value) { /* Do nothing, type mismatch */ }
-        @Override
-        public void add(double value) { /* Do nothing, type mismatch */ }
+    ProcessingResult processAndGet();
+  }
 
-        @Override
-        public ProcessingResult processAndGet() { // <--- 实现新方法
-            long[] data = values.stream().mapToLong(l -> l).toArray();
-            return new ProcessingResult(data, 0); // Exponent is 0 for integers
-        }
+  private static class Int64Buffer implements ValueBuffer {
+    private final List<Long> values = new ArrayList<>();
 
-        @Override
-        public void clear() { values.clear(); }
-        @Override
-        public boolean isEmpty() { return values.isEmpty(); }
-        @Override
-        public int size() { return values.size(); }
-    }
-
-    /** A buffer for FLOAT and DOUBLE types. It performs scaling in processAndGetLongs(). */
-    private static class DoubleBuffer implements ValueBuffer {
-        private final List<Double> values = new ArrayList<>();
-
-        @Override
-        public void add(int value) { /* Do nothing, type mismatch */ }
-        @Override
-        public void add(long value) { /* Do nothing, type mismatch */ }
-        @Override
-        public void add(float value) { values.add((double) value); } // Store as double to unify
-        @Override
-        public void add(double value) { values.add(value); }
-
-        @Override
-        public ProcessingResult processAndGet() {
-            // --- Edge Case: Handle empty buffer ---
-            if (values.isEmpty()) {
-                return new ProcessingResult(new long[0], 0);
-            }
-            int maxDecimalPlaces = 0;
-            for (double v : values) {
-                String s = BigDecimal.valueOf(v).toPlainString();
-                int dotIndex = s.indexOf('.');
-                if (dotIndex != -1) {
-                    int decimalPlaces = s.length() - dotIndex - 1;
-                    if (decimalPlaces > maxDecimalPlaces) {
-                        maxDecimalPlaces = decimalPlaces;
-                    }
-                }
-            }
-
-            double scalingFactor = Math.pow(10, maxDecimalPlaces);
-
-            long[] scaledLongs = new long[values.size()];
-            for (int i = 0; i < values.size(); i++) {
-                scaledLongs[i] = Math.round(values.get(i) * scalingFactor);
-            }
-
-            return new ProcessingResult(scaledLongs, maxDecimalPlaces);
-        }
-
-        @Override
-        public void clear() { values.clear(); }
-        @Override
-        public boolean isEmpty() { return values.isEmpty(); }
-        @Override
-        public int size() { return values.size(); }
-    }
-
-    private static class ProcessingResult {
-        final long[] scaledLongs;
-        final int scalingExponent; // e.g., 3 for a scaling factor of 1000
-
-        ProcessingResult(long[] scaledLongs, int scalingExponent) {
-            this.scaledLongs = scaledLongs;
-            this.scalingExponent = scalingExponent;
-        }
-
-        long[] getScaledLongs(){
-            return this.scaledLongs;
-        }
-
-        int getScalingExponent(){
-            return this.scalingExponent;
-        }
-    }
-
-
-    private final ValueBuffer buffer;
-
-    /**
-     * Constructor for AClusterEncoder. It's called by AClusterEncodingBuilder.
-     *
-     * @param dataType The data type of the time series, used for potential type-specific logic.
-     */
-    public AClusterEncoder(TSDataType dataType) {
-        super(TSEncoding.ACLUSTER);
-        switch (dataType) {
-            case INT32: case INT64:
-                this.buffer = new Int64Buffer();
-                break;
-            case FLOAT: case DOUBLE:
-                this.buffer = new DoubleBuffer();
-                break;
-            default:
-                throw new TsFileEncodingException("AClusterEncoder does not support data type: " + dataType);
-        }
+    @Override
+    public void add(int value) {
+      values.add((long) value);
     }
 
     @Override
-    public void encode(int value, ByteArrayOutputStream out) {
-        buffer.add(value);
+    public void add(long value) {
+      values.add(value);
     }
 
     @Override
-    public void encode(long value, ByteArrayOutputStream out) {
-        buffer.add(value);
+    public void add(float value) {
+      /* Do nothing, type mismatch */
     }
 
     @Override
-    public void encode(float value, ByteArrayOutputStream out) {
-        buffer.add(value);
+    public void add(double value) {
+      /* Do nothing, type mismatch */
     }
 
     @Override
-    public void encode(double value, ByteArrayOutputStream out) {
-        buffer.add(value);
+    public ProcessingResult processAndGet() { // <--- 实现新方法
+      long[] data = values.stream().mapToLong(l -> l).toArray();
+      return new ProcessingResult(data, 0); // Exponent is 0 for integers
     }
 
     @Override
-    public void flush(ByteArrayOutputStream out) throws IOException {
-        if (buffer.isEmpty()) {
-            return;
-        }
-
-        ProcessingResult procResult = buffer.processAndGet();
-        long[] originalData = procResult.getScaledLongs();
-        int scalingExponent = procResult.getScalingExponent();
-        if (originalData.length == 0) return;
-        long minVal = findMin(originalData);
-        long[] data = new long[originalData.length];
-        for (int i = 0; i < data.length; i++) {
-            data[i] = originalData[i] - minVal;
-        }
-
-        Object[] clusterResult = AClusterAlgorithm.run(data);
-        long[] sortedMedoids = (long[]) clusterResult[0];
-        int[] clusterAssignments = (int[]) clusterResult[1];
-        long[] clusterFrequencies = (long[]) clusterResult[2];
-
-        long[] sortedZigzagResiduals = calculateSortedZigzagResiduals(data, sortedMedoids, clusterAssignments, clusterFrequencies);
-
-        encodeResults(out, scalingExponent, minVal, sortedMedoids, clusterFrequencies, sortedZigzagResiduals);
-
-        buffer.clear();
-    }
-
-    private static class MedoidFreqPair {
-        long medoid;
-        long frequency;
-        int originalIndex;
-
-        MedoidFreqPair(long medoid, long frequency, int originalIndex) {
-            this.medoid = medoid;
-            this.frequency = frequency;
-            this.originalIndex = originalIndex;
-        }
-    }
-
-    /**
-     * A direct translation of your `residualCalculationZigzagNoHuff_sorted` logic.
-     * It computes residuals, applies zigzag encoding, and groups them by cluster.
-     */
-    private long[] calculateSortedZigzagResiduals(long[] data, long[] medoids, int[] assignments, long[] frequencies) {
-        int n = data.length;
-        int k = medoids.length;
-        if (n == 0) return new long[0];
-
-        long[] sortedResiduals = new long[n];
-        int[] writePointers = new int[k];
-        int cumulativeCount = 0;
-        for (int i = 0; i < k; i++) {
-            writePointers[i] = cumulativeCount;
-            cumulativeCount += (int) frequencies[i];
-        }
-
-        for (int i = 0; i < n; i++) {
-            int clusterId = assignments[i];
-            long medoid = medoids[clusterId];
-            long residual = data[i] - medoid;
-            long zigzagResidual = (residual << 1) ^ (residual >> 63); // Zigzag Encoding
-
-            int targetIndex = writePointers[clusterId];
-            sortedResiduals[targetIndex] = zigzagResidual;
-            writePointers[clusterId]++;
-        }
-        return sortedResiduals;
-    }
-
-    private void encodeResults(
-            ByteArrayOutputStream out,
-            int scalingExponent,
-            long minVal,
-            long[] medoids,
-            long[] frequencies,
-            long[] residuals)
-            throws IOException {
-
-        ClusterSupport writer = new ClusterSupport(out);
-        int numPoints = residuals.length;
-        int k = medoids.length;
-
-        writer.write(scalingExponent, 8);
-        writer.write(k, 16);
-        writer.write(numPoints, 16);
-        writer.write(DEFAULT_PACK_SIZE, 16);
-
-        int minValBit = ClusterSupport.bitsRequired(Math.abs(minVal));
-        writer.write(minValBit, 8);
-        writer.write(minVal>=0?0:1,1);
-        writer.write(minVal, minValBit);
-
-        if (k == 0) {
-            writer.flush();
-            return;
-        }
-
-        long minMedoid = findMin(medoids);
-        long[] medoidOffsets = new long[k];
-        int maxMedoidOffsetBits = 0;
-        for (int i = 0; i < k; i++) {
-            medoidOffsets[i] = medoids[i] - minMedoid;
-            maxMedoidOffsetBits = Math.max(maxMedoidOffsetBits, ClusterSupport.bitsRequired(medoidOffsets[i]));
-        }
-        int minMedoidBit = ClusterSupport.bitsRequired(Math.abs(minMedoid));
-        writer.write(minMedoidBit,8);
-        writer.write(minMedoid>0?0:1,1);
-        writer.write(minMedoid, minMedoidBit);
-
-        writer.write(maxMedoidOffsetBits, 8);
-        for (long offset : medoidOffsets) {
-            writer.write(offset, maxMedoidOffsetBits);
-        }
-
-        long[] freqDeltas = new long[k];
-        if (k > 0) {
-            freqDeltas[0] = frequencies[0];
-            for (int i = 1; i < k; i++) {
-                freqDeltas[i] = frequencies[i] - frequencies[i - 1];
-            }
-        }
-        int numFreqBlocks = (k + DEFAULT_PACK_SIZE - 1) / DEFAULT_PACK_SIZE;
-        writer.write(numFreqBlocks, 16);
-
-        int[] freqBlockMaxBits = new int[numFreqBlocks];
-        // Metadata pass for frequencies
-        for (int i = 0; i < numFreqBlocks; i++) {
-            int start = i * DEFAULT_PACK_SIZE;
-            int end = Math.min(start + DEFAULT_PACK_SIZE, k);
-            long maxDelta = 0;
-            for (int j = start; j < end; j++) {
-                maxDelta = Math.max(maxDelta, freqDeltas[j]);
-            }
-            freqBlockMaxBits[i] = ClusterSupport.bitsRequired(maxDelta);
-            writer.write(freqBlockMaxBits[i], 8);
-        }
-        // Data pass for frequencies
-        for (int i = 0; i < numFreqBlocks; i++) {
-            int start = i * DEFAULT_PACK_SIZE;
-            int end = Math.min(start + DEFAULT_PACK_SIZE, k);
-            int bitsForBlock = freqBlockMaxBits[i];
-            for (int j = start; j < end; j++) {
-                writer.write(freqDeltas[j], bitsForBlock);
-            }
-        }
-
-        int numPacks = (numPoints + DEFAULT_PACK_SIZE - 1) / DEFAULT_PACK_SIZE;
-        writer.write(numPacks, 32);
-
-        int[] resPackMaxBits = new int[numPacks];
-        // Metadata pass for residuals
-        for (int i = 0; i < numPacks; i++) {
-            int start = i * DEFAULT_PACK_SIZE;
-            int end = Math.min(start + DEFAULT_PACK_SIZE, numPoints);
-            long maxOffset = 0;
-            for (int j = start; j < end; j++) {
-                maxOffset = Math.max(maxOffset, residuals[j]);
-            }
-            resPackMaxBits[i] = ClusterSupport.bitsRequired(maxOffset);
-            writer.write(resPackMaxBits[i], 8);
-        }
-        // Data pass for residuals
-        for (int i = 0; i < numPacks; i++) {
-            int start = i * DEFAULT_PACK_SIZE;
-            int end = Math.min(start + DEFAULT_PACK_SIZE, numPoints);
-            int bitsForPack = resPackMaxBits[i];
-            if (bitsForPack > 0) {
-                for (int j = start; j < end; j++) {
-                    writer.write(residuals[j], bitsForPack);
-                }
-            }
-        }
-
-        writer.flush();
-    }
-
-    private long findMin(long[] data) {
-        if (data == null || data.length == 0) {
-            throw new IllegalArgumentException("Data array cannot be null or empty.");
-        }
-        long min = data[0];
-        for (int i = 1; i < data.length; i++) {
-            if (data[i] < min) {
-                min = data[i];
-            }
-        }
-        return min;
+    public void clear() {
+      values.clear();
     }
 
     @Override
-    public int getOneItemMaxSize() {
-        return 8;
+    public boolean isEmpty() {
+      return values.isEmpty();
     }
 
     @Override
-    public long getMaxByteSize() {
-        if (this.buffer.isEmpty()) {
-            return 0;
+    public int size() {
+      return values.size();
+    }
+  }
+
+  /** A buffer for FLOAT and DOUBLE types. It performs scaling in processAndGetLongs(). */
+  private static class DoubleBuffer implements ValueBuffer {
+    private final List<Double> values = new ArrayList<>();
+
+    @Override
+    public void add(int value) {
+      /* Do nothing, type mismatch */
+    }
+
+    @Override
+    public void add(long value) {
+      /* Do nothing, type mismatch */
+    }
+
+    @Override
+    public void add(float value) {
+      values.add((double) value);
+    } // Store as double to unify
+
+    @Override
+    public void add(double value) {
+      values.add(value);
+    }
+
+    @Override
+    public ProcessingResult processAndGet() {
+      // --- Edge Case: Handle empty buffer ---
+      if (values.isEmpty()) {
+        return new ProcessingResult(new long[0], 0);
+      }
+      int maxDecimalPlaces = 0;
+      for (double v : values) {
+        String s = BigDecimal.valueOf(v).toPlainString();
+        int dotIndex = s.indexOf('.');
+        if (dotIndex != -1) {
+          int decimalPlaces = s.length() - dotIndex - 1;
+          if (decimalPlaces > maxDecimalPlaces) {
+            maxDecimalPlaces = decimalPlaces;
+          }
         }
-        return (long) this.buffer.size() * getOneItemMaxSize() * 3 / 2;
+      }
+
+      double scalingFactor = Math.pow(10, maxDecimalPlaces);
+
+      long[] scaledLongs = new long[values.size()];
+      for (int i = 0; i < values.size(); i++) {
+        scaledLongs[i] = Math.round(values.get(i) * scalingFactor);
+      }
+
+      return new ProcessingResult(scaledLongs, maxDecimalPlaces);
     }
 
     @Override
-    public void encode(boolean value, ByteArrayOutputStream out) {
-        throw new TsFileEncodingException("AClusterEncoder does not support boolean values.");
+    public void clear() {
+      values.clear();
     }
 
     @Override
-    public void encode(short value, ByteArrayOutputStream out) {
-        throw new TsFileEncodingException("AClusterEncoder does not support short values.");
+    public boolean isEmpty() {
+      return values.isEmpty();
     }
 
     @Override
-    public void encode(Binary value, ByteArrayOutputStream out) {
-        throw new TsFileEncodingException("AClusterEncoder does not support Binary values.");
+    public int size() {
+      return values.size();
+    }
+  }
+
+  private static class ProcessingResult {
+    final long[] scaledLongs;
+    final int scalingExponent; // e.g., 3 for a scaling factor of 1000
+
+    ProcessingResult(long[] scaledLongs, int scalingExponent) {
+      this.scaledLongs = scaledLongs;
+      this.scalingExponent = scalingExponent;
     }
 
-    @Override
-    public void encode(BigDecimal value, ByteArrayOutputStream out) {
-        throw new TsFileEncodingException("AClusterEncoder does not support BigDecimal values.");
+    long[] getScaledLongs() {
+      return this.scaledLongs;
     }
+
+    int getScalingExponent() {
+      return this.scalingExponent;
+    }
+  }
+
+  private final ValueBuffer buffer;
+
+  /**
+   * Constructor for AClusterEncoder. It's called by AClusterEncodingBuilder.
+   *
+   * @param dataType The data type of the time series, used for potential type-specific logic.
+   */
+  public AClusterEncoder(TSDataType dataType) {
+    super(TSEncoding.ACLUSTER);
+    switch (dataType) {
+      case INT32:
+      case INT64:
+        this.buffer = new Int64Buffer();
+        break;
+      case FLOAT:
+      case DOUBLE:
+        this.buffer = new DoubleBuffer();
+        break;
+      default:
+        throw new TsFileEncodingException(
+            "AClusterEncoder does not support data type: " + dataType);
+    }
+  }
+
+  @Override
+  public void encode(int value, ByteArrayOutputStream out) {
+    buffer.add(value);
+  }
+
+  @Override
+  public void encode(long value, ByteArrayOutputStream out) {
+    buffer.add(value);
+  }
+
+  @Override
+  public void encode(float value, ByteArrayOutputStream out) {
+    buffer.add(value);
+  }
+
+  @Override
+  public void encode(double value, ByteArrayOutputStream out) {
+    buffer.add(value);
+  }
+
+  @Override
+  public void flush(ByteArrayOutputStream out) throws IOException {
+    if (buffer.isEmpty()) {
+      return;
+    }
+
+    ProcessingResult procResult = buffer.processAndGet();
+    long[] originalData = procResult.getScaledLongs();
+    int scalingExponent = procResult.getScalingExponent();
+    if (originalData.length == 0) return;
+    long minVal = findMin(originalData);
+    long[] data = new long[originalData.length];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = originalData[i] - minVal;
+    }
+
+    Object[] clusterResult = AClusterAlgorithm.run(data);
+    long[] sortedMedoids = (long[]) clusterResult[0];
+    int[] clusterAssignments = (int[]) clusterResult[1];
+    long[] clusterFrequencies = (long[]) clusterResult[2];
+
+    long[] sortedZigzagResiduals =
+        calculateSortedZigzagResiduals(data, sortedMedoids, clusterAssignments, clusterFrequencies);
+
+    encodeResults(
+        out, scalingExponent, minVal, sortedMedoids, clusterFrequencies, sortedZigzagResiduals);
+
+    buffer.clear();
+  }
+
+  private static class MedoidFreqPair {
+    long medoid;
+    long frequency;
+    int originalIndex;
+
+    MedoidFreqPair(long medoid, long frequency, int originalIndex) {
+      this.medoid = medoid;
+      this.frequency = frequency;
+      this.originalIndex = originalIndex;
+    }
+  }
+
+  /**
+   * A direct translation of your `residualCalculationZigzagNoHuff_sorted` logic. It computes
+   * residuals, applies zigzag encoding, and groups them by cluster.
+   */
+  private long[] calculateSortedZigzagResiduals(
+      long[] data, long[] medoids, int[] assignments, long[] frequencies) {
+    int n = data.length;
+    int k = medoids.length;
+    if (n == 0) return new long[0];
+
+    long[] sortedResiduals = new long[n];
+    int[] writePointers = new int[k];
+    int cumulativeCount = 0;
+    for (int i = 0; i < k; i++) {
+      writePointers[i] = cumulativeCount;
+      cumulativeCount += (int) frequencies[i];
+    }
+
+    for (int i = 0; i < n; i++) {
+      int clusterId = assignments[i];
+      long medoid = medoids[clusterId];
+      long residual = data[i] - medoid;
+      long zigzagResidual = (residual << 1) ^ (residual >> 63); // Zigzag Encoding
+
+      int targetIndex = writePointers[clusterId];
+      sortedResiduals[targetIndex] = zigzagResidual;
+      writePointers[clusterId]++;
+    }
+    return sortedResiduals;
+  }
+
+  private void encodeResults(
+      ByteArrayOutputStream out,
+      int scalingExponent,
+      long minVal,
+      long[] medoids,
+      long[] frequencies,
+      long[] residuals)
+      throws IOException {
+
+    ClusterSupport writer = new ClusterSupport(out);
+    int numPoints = residuals.length;
+    int k = medoids.length;
+
+    writer.write(scalingExponent, 8);
+    writer.write(k, 16);
+    writer.write(numPoints, 16);
+    writer.write(DEFAULT_PACK_SIZE, 16);
+
+    int minValBit = ClusterSupport.bitsRequired(Math.abs(minVal));
+    writer.write(minValBit, 8);
+    writer.write(minVal >= 0 ? 0 : 1, 1);
+    writer.write(minVal, minValBit);
+
+    if (k == 0) {
+      writer.flush();
+      return;
+    }
+
+    long minMedoid = findMin(medoids);
+    long[] medoidOffsets = new long[k];
+    int maxMedoidOffsetBits = 0;
+    for (int i = 0; i < k; i++) {
+      medoidOffsets[i] = medoids[i] - minMedoid;
+      maxMedoidOffsetBits =
+          Math.max(maxMedoidOffsetBits, ClusterSupport.bitsRequired(medoidOffsets[i]));
+    }
+    int minMedoidBit = ClusterSupport.bitsRequired(Math.abs(minMedoid));
+    writer.write(minMedoidBit, 8);
+    writer.write(minMedoid > 0 ? 0 : 1, 1);
+    writer.write(minMedoid, minMedoidBit);
+
+    writer.write(maxMedoidOffsetBits, 8);
+    for (long offset : medoidOffsets) {
+      writer.write(offset, maxMedoidOffsetBits);
+    }
+
+    long[] freqDeltas = new long[k];
+    if (k > 0) {
+      freqDeltas[0] = frequencies[0];
+      for (int i = 1; i < k; i++) {
+        freqDeltas[i] = frequencies[i] - frequencies[i - 1];
+      }
+    }
+    int numFreqBlocks = (k + DEFAULT_PACK_SIZE - 1) / DEFAULT_PACK_SIZE;
+    writer.write(numFreqBlocks, 16);
+
+    int[] freqBlockMaxBits = new int[numFreqBlocks];
+    // Metadata pass for frequencies
+    for (int i = 0; i < numFreqBlocks; i++) {
+      int start = i * DEFAULT_PACK_SIZE;
+      int end = Math.min(start + DEFAULT_PACK_SIZE, k);
+      long maxDelta = 0;
+      for (int j = start; j < end; j++) {
+        maxDelta = Math.max(maxDelta, freqDeltas[j]);
+      }
+      freqBlockMaxBits[i] = ClusterSupport.bitsRequired(maxDelta);
+      writer.write(freqBlockMaxBits[i], 8);
+    }
+    // Data pass for frequencies
+    for (int i = 0; i < numFreqBlocks; i++) {
+      int start = i * DEFAULT_PACK_SIZE;
+      int end = Math.min(start + DEFAULT_PACK_SIZE, k);
+      int bitsForBlock = freqBlockMaxBits[i];
+      for (int j = start; j < end; j++) {
+        writer.write(freqDeltas[j], bitsForBlock);
+      }
+    }
+
+    int numPacks = (numPoints + DEFAULT_PACK_SIZE - 1) / DEFAULT_PACK_SIZE;
+    writer.write(numPacks, 32);
+
+    int[] resPackMaxBits = new int[numPacks];
+    // Metadata pass for residuals
+    for (int i = 0; i < numPacks; i++) {
+      int start = i * DEFAULT_PACK_SIZE;
+      int end = Math.min(start + DEFAULT_PACK_SIZE, numPoints);
+      long maxOffset = 0;
+      for (int j = start; j < end; j++) {
+        maxOffset = Math.max(maxOffset, residuals[j]);
+      }
+      resPackMaxBits[i] = ClusterSupport.bitsRequired(maxOffset);
+      writer.write(resPackMaxBits[i], 8);
+    }
+    // Data pass for residuals
+    for (int i = 0; i < numPacks; i++) {
+      int start = i * DEFAULT_PACK_SIZE;
+      int end = Math.min(start + DEFAULT_PACK_SIZE, numPoints);
+      int bitsForPack = resPackMaxBits[i];
+      if (bitsForPack > 0) {
+        for (int j = start; j < end; j++) {
+          writer.write(residuals[j], bitsForPack);
+        }
+      }
+    }
+
+    writer.flush();
+  }
+
+  private long findMin(long[] data) {
+    if (data == null || data.length == 0) {
+      throw new IllegalArgumentException("Data array cannot be null or empty.");
+    }
+    long min = data[0];
+    for (int i = 1; i < data.length; i++) {
+      if (data[i] < min) {
+        min = data[i];
+      }
+    }
+    return min;
+  }
+
+  @Override
+  public int getOneItemMaxSize() {
+    return 8;
+  }
+
+  @Override
+  public long getMaxByteSize() {
+    if (this.buffer.isEmpty()) {
+      return 0;
+    }
+    return (long) this.buffer.size() * getOneItemMaxSize() * 3 / 2;
+  }
+
+  @Override
+  public void encode(boolean value, ByteArrayOutputStream out) {
+    throw new TsFileEncodingException("AClusterEncoder does not support boolean values.");
+  }
+
+  @Override
+  public void encode(short value, ByteArrayOutputStream out) {
+    throw new TsFileEncodingException("AClusterEncoder does not support short values.");
+  }
+
+  @Override
+  public void encode(Binary value, ByteArrayOutputStream out) {
+    throw new TsFileEncodingException("AClusterEncoder does not support Binary values.");
+  }
+
+  @Override
+  public void encode(BigDecimal value, ByteArrayOutputStream out) {
+    throw new TsFileEncodingException("AClusterEncoder does not support BigDecimal values.");
+  }
 }
-
-
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/ClusterSupport.java b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/ClusterSupport.java
index 6a189a9..32352c8 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/ClusterSupport.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/ClusterSupport.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.tsfile.encoding.encoder;
 
 import java.io.ByteArrayOutputStream;
@@ -5,73 +24,74 @@
 
 public class ClusterSupport {
 
-    private final ByteArrayOutputStream out;
-    private byte currentByte;
-    private int bitPosition; // 0-7, from left to right (MSB to LSB)
+  private final ByteArrayOutputStream out;
+  private byte currentByte;
+  private int bitPosition; // 0-7, from left to right (MSB to LSB)
 
-    public ClusterSupport(ByteArrayOutputStream out) {
-        this.out = out;
-        this.currentByte = 0;
-        this.bitPosition = 0;
-    }
+  public ClusterSupport(ByteArrayOutputStream out) {
+    this.out = out;
+    this.currentByte = 0;
+    this.bitPosition = 0;
+  }
 
-    /**
-     * Writes a value using a specified number of bits.
-     *
-     * @param value The long value to write. Only the lower `numBits` will be used.
-     * @param numBits The number of bits to write for the value (must be > 0 and <= 64).
-     * @throws IOException If an I/O error occurs.
-     */
-    public void write(long value, int numBits) throws IOException {
-        if (numBits <= 0 || numBits > 64) {
-            throw new IllegalArgumentException("Number of bits must be between 1 and 64.");
-        }
-        for (int i = numBits - 1; i >= 0; i--) {
-            // Get the i-th bit from the value
-            boolean bit = ((value >> i) & 1) == 1;
-            writeBit(bit);
-        }
+  /**
+   * Writes a value using a specified number of bits.
+   *
+   * @param value The long value to write. Only the lower `numBits` will be used.
+   * @param numBits The number of bits to write for the value (must be > 0 and <= 64).
+   * @throws IOException If an I/O error occurs.
+   */
+  public void write(long value, int numBits) throws IOException {
+    if (numBits <= 0 || numBits > 64) {
+      throw new IllegalArgumentException("Number of bits must be between 1 and 64.");
     }
+    for (int i = numBits - 1; i >= 0; i--) {
+      // Get the i-th bit from the value
+      boolean bit = ((value >> i) & 1) == 1;
+      writeBit(bit);
+    }
+  }
 
-    private void writeBit(boolean bit) throws IOException {
-        if (bit) {
-            currentByte |= (1 << (7 - bitPosition));
-        }
-        bitPosition++;
-        if (bitPosition == 8) {
-            out.write(currentByte);
-            currentByte = 0;
-            bitPosition = 0;
-        }
+  private void writeBit(boolean bit) throws IOException {
+    if (bit) {
+      currentByte |= (1 << (7 - bitPosition));
     }
+    bitPosition++;
+    if (bitPosition == 8) {
+      out.write(currentByte);
+      currentByte = 0;
+      bitPosition = 0;
+    }
+  }
 
-    /**
-     * Flushes any remaining bits in the current byte to the output stream.
-     * This must be called at the end to ensure all data is written.
-     * @throws IOException If an I/O error occurs.
-     */
-    public void flush() throws IOException {
-        if (bitPosition > 0) {
-            out.write(currentByte);
-        }
-        // It's good practice to reset, though not strictly necessary if the instance is discarded.
-        currentByte = 0;
-        bitPosition = 0;
+  /**
+   * Flushes any remaining bits in the current byte to the output stream. This must be called at the
+   * end to ensure all data is written.
+   *
+   * @throws IOException If an I/O error occurs.
+   */
+  public void flush() throws IOException {
+    if (bitPosition > 0) {
+      out.write(currentByte);
     }
+    // It's good practice to reset, though not strictly necessary if the instance is discarded.
+    currentByte = 0;
+    bitPosition = 0;
+  }
 
-    /**
-     * A helper to calculate the number of bits required for a non-negative long value.
-     *
-     * @param value The non-negative value.
-     * @return The number of bits required to represent it.
-     */
-    public static int bitsRequired(long value) {
-        if (value < 0) {
-            throw new IllegalArgumentException("Value must be non-negative.");
-        }
-        if (value == 0) {
-            return 1;
-        }
-        return 64 - Long.numberOfLeadingZeros(value);
+  /**
+   * A helper to calculate the number of bits required for a non-negative long value.
+   *
+   * @param value The non-negative value.
+   * @return The number of bits required to represent it.
+   */
+  public static int bitsRequired(long value) {
+    if (value < 0) {
+      throw new IllegalArgumentException("Value must be non-negative.");
     }
+    if (value == 0) {
+      return 1;
+    }
+    return 64 - Long.numberOfLeadingZeros(value);
+  }
 }
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/KClusterAlgorithm.java b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/KClusterAlgorithm.java
index 53016aa..1288244 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/KClusterAlgorithm.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/KClusterAlgorithm.java
@@ -1,278 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.tsfile.encoding.encoder;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
-import java.util.HashSet;
-import java.util.Arrays;
 
 public class KClusterAlgorithm {
 
-    private static final Random rand = new Random();
+  private static final Random rand = new Random();
 
-    /**
-     * Private constructor to prevent instantiation.
-     */
-    private KClusterAlgorithm() {}
+  /** Private constructor to prevent instantiation. */
+  private KClusterAlgorithm() {}
 
-    public static Object[] run(long[] data, int k) {
-        if (data == null || data.length == 0) {
-            return new Object[] {new long[0], new int[0], new long[0]};
-        }
-
-        long[][] data2D = new long[data.length][1];
-        for (int i = 0; i < data.length; i++) {
-            data2D[i][0] = data[i];
-        }
-
-        return kMedoidLogCost(data, k, 2, 0.01);
+  public static Object[] run(long[] data, int k) {
+    if (data == null || data.length == 0) {
+      return new Object[] {new long[0], new int[0], new long[0]};
     }
 
-    /**
-     * Helper class for sorting medoids based on their cluster size (frequency).
-     */
-    private static class MedoidSortHelper implements Comparable<KClusterAlgorithm.MedoidSortHelper> {
-        long medoid;
-        long size;
-        int originalIndex;
-
-        MedoidSortHelper(long medoid, long size, int originalIndex) {
-            this.medoid = medoid;
-            this.size = size;
-            this.originalIndex = originalIndex;
-        }
-
-        @Override
-        public int compareTo(KClusterAlgorithm.MedoidSortHelper other) {
-            return Long.compare(this.size, other.size);
-        }
+    long[][] data2D = new long[data.length][1];
+    for (int i = 0; i < data.length; i++) {
+      data2D[i][0] = data[i];
     }
 
-    /**
-     * The core K-Medoids algorithm, specifically implemented for 1D data.
-     */
-    private static Object[] kMedoidLogCost(long[] data, int k, int maxIter, double tol) {
-        int n = data.length;
+    return kMedoidLogCost(data, k, 2, 0.01);
+  }
 
-        // Use a HashSet for efficient uniqueness checking on primitive longs.
-        Set<Long> uniquePoints = new HashSet<>();
-        for (long point : data) {
-            uniquePoints.add(point);
-            if (uniquePoints.size() > k) {
-                break;
-            }
-        }
-        int distinctCount = uniquePoints.size();
-        if (distinctCount < k) {
-            System.err.println("Warning: Distinct data points (" + distinctCount +
-                    ") is less than input k (" + k + "), setting k to " + distinctCount);
-            k = distinctCount;
-        }
+  /** Helper class for sorting medoids based on their cluster size (frequency). */
+  private static class MedoidSortHelper implements Comparable<KClusterAlgorithm.MedoidSortHelper> {
+    long medoid;
+    long size;
+    int originalIndex;
 
-        if (k <= 0) {
-            return new Object[]{new long[0], new int[n], new long[0]};
-        }
-
-        // 1. Initialize medoids using a K-Medoids++ style approach.
-        long[] medoids = acceleratedInitialization(data, k);
-        int[] clusterAssignment = new int[n];
-        long previousTotalCost = Long.MAX_VALUE;
-
-        // 2. Main iterative loop (Build and Swap phases).
-        for (int iteration = 0; iteration < maxIter; iteration++) {
-            // --- Assignment Step ---
-            long totalCostThisRound = 0L;
-            for (int i = 0; i < n; i++) {
-                long minCost = Long.MAX_VALUE;
-                int assignedMedoidIndex = -1;
-                for (int m = 0; m < k; m++) {
-                    long cost = calculateResidualCost(data[i], medoids[m]);
-                    if (cost < minCost) {
-                        minCost = cost;
-                        assignedMedoidIndex = m;
-                        if (minCost == 0) break; // Optimization: A perfect match is unbeatable.
-                    }
-                }
-                clusterAssignment[i] = assignedMedoidIndex;
-                totalCostThisRound += minCost;
-            }
-
-            // --- Convergence Check (Cost) ---
-            if (iteration > 0 && (previousTotalCost - totalCostThisRound) < tol) {
-                break;
-            }
-            previousTotalCost = totalCostThisRound;
-
-            // --- Update Step ---
-            long[] newMedoids = updateMedoids(data, clusterAssignment, k);
-
-            // --- Convergence Check (Medoids) ---
-            if (Arrays.equals(medoids, newMedoids)) {
-                break;
-            }
-            medoids = newMedoids;
-        }
-
-        // 3. Calculate final cluster sizes and sort results.
-        long[] finalClusterSizes = new long[k];
-        for (int assignment : clusterAssignment) {
-            if (assignment != -1) finalClusterSizes[assignment]++;
-        }
-        return sortResults(medoids, clusterAssignment, finalClusterSizes);
+    MedoidSortHelper(long medoid, long size, int originalIndex) {
+      this.medoid = medoid;
+      this.size = size;
+      this.originalIndex = originalIndex;
     }
 
-    /**
-     * K-Medoids++ style initialization for 1D data.
-     */
-    private static long[] acceleratedInitialization(long[] data, int k) {
-        long[] medoids = new long[k];
-        Set<Long> selectedMedoids = new HashSet<>();
+    @Override
+    public int compareTo(KClusterAlgorithm.MedoidSortHelper other) {
+      return Long.compare(this.size, other.size);
+    }
+  }
 
-        int firstIndex = rand.nextInt(data.length);
-        medoids[0] = data[firstIndex];
-        selectedMedoids.add(medoids[0]);
+  /** The core K-Medoids algorithm, specifically implemented for 1D data. */
+  private static Object[] kMedoidLogCost(long[] data, int k, int maxIter, double tol) {
+    int n = data.length;
 
-        long[] distances = new long[data.length];
-        for (int i = 0; i < data.length; i++) {
-            distances[i] = Math.abs(data[i] - medoids[0]);
-        }
-
-        for (int i = 1; i < k; i++) {
-            long[] prefixSums = new long[data.length];
-            prefixSums[0] = distances[0];
-            for (int p = 1; p < data.length; p++) {
-                prefixSums[p] = prefixSums[p - 1] + distances[p];
-            }
-            long totalDistance = prefixSums[data.length - 1];
-            if (totalDistance == 0) {
-                int idx = rand.nextInt(data.length);
-                while (selectedMedoids.contains(data[idx])) {
-                    idx = (idx + 1) % data.length;
-                }
-                medoids[i] = data[idx];
-                selectedMedoids.add(medoids[i]);
-                continue;
-            }
-
-            long randValue = (long) (rand.nextDouble() * totalDistance);
-            int chosenIdx = binarySearch(prefixSums, randValue);
-
-            while (selectedMedoids.contains(data[chosenIdx])) {
-                chosenIdx = (chosenIdx + 1) % data.length;
-            }
-            medoids[i] = data[chosenIdx];
-            selectedMedoids.add(medoids[i]);
-
-            for (int idx = 0; idx < data.length; idx++) {
-                long distNewMedoid = Math.abs(data[idx] - medoids[i]);
-                if (distNewMedoid < distances[idx]) {
-                    distances[idx] = distNewMedoid;
-                }
-            }
-        }
-        return medoids;
+    // Use a HashSet for efficient uniqueness checking on primitive longs.
+    Set<Long> uniquePoints = new HashSet<>();
+    for (long point : data) {
+      uniquePoints.add(point);
+      if (uniquePoints.size() > k) {
+        break;
+      }
+    }
+    int distinctCount = uniquePoints.size();
+    if (distinctCount < k) {
+      System.err.println(
+          "Warning: Distinct data points ("
+              + distinctCount
+              + ") is less than input k ("
+              + k
+              + "), setting k to "
+              + distinctCount);
+      k = distinctCount;
     }
 
-    /**
-     * Updates medoids by finding the point within each cluster that minimizes the total intra-cluster cost.
-     */
-    private static long[] updateMedoids(long[] data, int[] clusterAssignment, int k) {
-        long[] newMedoids = new long[k];
-        List<Long>[] clusterPoints = new ArrayList[k];
-        for (int i = 0; i < k; i++) {
-            clusterPoints[i] = new ArrayList<>();
-        }
-        for (int i = 0; i < data.length; i++) {
-            if (clusterAssignment[i] != -1) {
-                clusterPoints[clusterAssignment[i]].add(data[i]);
-            }
-        }
+    if (k <= 0) {
+      return new Object[] {new long[0], new int[n], new long[0]};
+    }
 
+    // 1. Initialize medoids using a K-Medoids++ style approach.
+    long[] medoids = acceleratedInitialization(data, k);
+    int[] clusterAssignment = new int[n];
+    long previousTotalCost = Long.MAX_VALUE;
+
+    // 2. Main iterative loop (Build and Swap phases).
+    for (int iteration = 0; iteration < maxIter; iteration++) {
+      // --- Assignment Step ---
+      long totalCostThisRound = 0L;
+      for (int i = 0; i < n; i++) {
+        long minCost = Long.MAX_VALUE;
+        int assignedMedoidIndex = -1;
         for (int m = 0; m < k; m++) {
-            List<Long> members = clusterPoints[m];
-            if (members.isEmpty()) continue;
-
-            long minTotalClusterCost = Long.MAX_VALUE;
-            long newMedoid = members.get(0);
-
-            for (Long candidate : members) {
-                long currentCandidateTotalCost = 0L;
-                for (Long otherMember : members) {
-                    currentCandidateTotalCost += calculateResidualCost(candidate, otherMember);
-                }
-                if (currentCandidateTotalCost < minTotalClusterCost) {
-                    minTotalClusterCost = currentCandidateTotalCost;
-                    newMedoid = candidate;
-                }
-            }
-            newMedoids[m] = newMedoid;
+          long cost = calculateResidualCost(data[i], medoids[m]);
+          if (cost < minCost) {
+            minCost = cost;
+            assignedMedoidIndex = m;
+            if (minCost == 0) break; // Optimization: A perfect match is unbeatable.
+          }
         }
-        return newMedoids;
+        clusterAssignment[i] = assignedMedoidIndex;
+        totalCostThisRound += minCost;
+      }
+
+      // --- Convergence Check (Cost) ---
+      if (iteration > 0 && (previousTotalCost - totalCostThisRound) < tol) {
+        break;
+      }
+      previousTotalCost = totalCostThisRound;
+
+      // --- Update Step ---
+      long[] newMedoids = updateMedoids(data, clusterAssignment, k);
+
+      // --- Convergence Check (Medoids) ---
+      if (Arrays.equals(medoids, newMedoids)) {
+        break;
+      }
+      medoids = newMedoids;
     }
 
-    /**
-     * Sorts the final medoids and their cluster information based on cluster frequency.
-     *
-     * @param medoids The discovered medoids.
-     * @param clusterAssignment The assignment map for each data point.
-     * @param clusterSize The frequency of each cluster.
-     * @return A sorted and correctly mapped Object array.
-     */
-    private static Object[] sortResults(long[] medoids, int[] clusterAssignment, long[] clusterSize) {
-        int k = medoids.length;
-        List<KClusterAlgorithm.MedoidSortHelper> sorters = new ArrayList<>();
-        for (int i = 0; i < k; i++) {
-            sorters.add(new KClusterAlgorithm.MedoidSortHelper(medoids[i], clusterSize[i], i));
-        }
-        Collections.sort(sorters);
+    // 3. Calculate final cluster sizes and sort results.
+    long[] finalClusterSizes = new long[k];
+    for (int assignment : clusterAssignment) {
+      if (assignment != -1) finalClusterSizes[assignment]++;
+    }
+    return sortResults(medoids, clusterAssignment, finalClusterSizes);
+  }
 
-        long[] sortedMedoids = new long[k];
-        long[] sortedClusterSize = new long[k];
-        int[] oldToNewIndexMap = new int[k];
+  /** K-Medoids++ style initialization for 1D data. */
+  private static long[] acceleratedInitialization(long[] data, int k) {
+    long[] medoids = new long[k];
+    Set<Long> selectedMedoids = new HashSet<>();
 
-        for (int i = 0; i < k; i++) {
-            KClusterAlgorithm.MedoidSortHelper sortedItem = sorters.get(i);
-            sortedMedoids[i] = sortedItem.medoid;
-            sortedClusterSize[i] = sortedItem.size;
-            oldToNewIndexMap[sortedItem.originalIndex] = i;
-        }
+    int firstIndex = rand.nextInt(data.length);
+    medoids[0] = data[firstIndex];
+    selectedMedoids.add(medoids[0]);
 
-        int[] sortedClusterAssignment = new int[clusterAssignment.length];
-        for (int i = 0; i < clusterAssignment.length; i++) {
-            int oldIndex = clusterAssignment[i];
-            sortedClusterAssignment[i] = oldToNewIndexMap[oldIndex];
-        }
-
-        return new Object[]{sortedMedoids, sortedClusterAssignment, sortedClusterSize};
+    long[] distances = new long[data.length];
+    for (int i = 0; i < data.length; i++) {
+      distances[i] = Math.abs(data[i] - medoids[0]);
     }
 
-    // --- Cost Calculation Functions ---
-
-    private static long bitLengthCost(long value) {
-        if (value == 0) return 1;
-        return 64 - Long.numberOfLeadingZeros(value);
-    }
-
-    private static long calculateResidualCost(long p1, long p2) {
-        return 1 + bitLengthCost(Math.abs(p1 - p2));
-    }
-
-    private static int binarySearch(long[] prefixSums, long value) {
-        int low = 0;
-        int high = prefixSums.length - 1;
-        int ans = -1;
-        while (low <= high) {
-            int mid = low + (high - low) / 2;
-            if (prefixSums[mid] >= value) {
-                ans = mid;
-                high = mid - 1;
-            } else {
-                low = mid + 1;
-            }
+    for (int i = 1; i < k; i++) {
+      long[] prefixSums = new long[data.length];
+      prefixSums[0] = distances[0];
+      for (int p = 1; p < data.length; p++) {
+        prefixSums[p] = prefixSums[p - 1] + distances[p];
+      }
+      long totalDistance = prefixSums[data.length - 1];
+      if (totalDistance == 0) {
+        int idx = rand.nextInt(data.length);
+        while (selectedMedoids.contains(data[idx])) {
+          idx = (idx + 1) % data.length;
         }
-        return ans == -1 ? prefixSums.length - 1 : ans;
+        medoids[i] = data[idx];
+        selectedMedoids.add(medoids[i]);
+        continue;
+      }
+
+      long randValue = (long) (rand.nextDouble() * totalDistance);
+      int chosenIdx = binarySearch(prefixSums, randValue);
+
+      while (selectedMedoids.contains(data[chosenIdx])) {
+        chosenIdx = (chosenIdx + 1) % data.length;
+      }
+      medoids[i] = data[chosenIdx];
+      selectedMedoids.add(medoids[i]);
+
+      for (int idx = 0; idx < data.length; idx++) {
+        long distNewMedoid = Math.abs(data[idx] - medoids[i]);
+        if (distNewMedoid < distances[idx]) {
+          distances[idx] = distNewMedoid;
+        }
+      }
+    }
+    return medoids;
+  }
+
+  /**
+   * Updates medoids by finding the point within each cluster that minimizes the total intra-cluster
+   * cost.
+   */
+  private static long[] updateMedoids(long[] data, int[] clusterAssignment, int k) {
+    long[] newMedoids = new long[k];
+    List<Long>[] clusterPoints = new ArrayList[k];
+    for (int i = 0; i < k; i++) {
+      clusterPoints[i] = new ArrayList<>();
+    }
+    for (int i = 0; i < data.length; i++) {
+      if (clusterAssignment[i] != -1) {
+        clusterPoints[clusterAssignment[i]].add(data[i]);
+      }
     }
 
+    for (int m = 0; m < k; m++) {
+      List<Long> members = clusterPoints[m];
+      if (members.isEmpty()) continue;
+
+      long minTotalClusterCost = Long.MAX_VALUE;
+      long newMedoid = members.get(0);
+
+      for (Long candidate : members) {
+        long currentCandidateTotalCost = 0L;
+        for (Long otherMember : members) {
+          currentCandidateTotalCost += calculateResidualCost(candidate, otherMember);
+        }
+        if (currentCandidateTotalCost < minTotalClusterCost) {
+          minTotalClusterCost = currentCandidateTotalCost;
+          newMedoid = candidate;
+        }
+      }
+      newMedoids[m] = newMedoid;
+    }
+    return newMedoids;
+  }
+
+  /**
+   * Sorts the final medoids and their cluster information based on cluster frequency.
+   *
+   * @param medoids The discovered medoids.
+   * @param clusterAssignment The assignment map for each data point.
+   * @param clusterSize The frequency of each cluster.
+   * @return A sorted and correctly mapped Object array.
+   */
+  private static Object[] sortResults(long[] medoids, int[] clusterAssignment, long[] clusterSize) {
+    int k = medoids.length;
+    List<KClusterAlgorithm.MedoidSortHelper> sorters = new ArrayList<>();
+    for (int i = 0; i < k; i++) {
+      sorters.add(new KClusterAlgorithm.MedoidSortHelper(medoids[i], clusterSize[i], i));
+    }
+    Collections.sort(sorters);
+
+    long[] sortedMedoids = new long[k];
+    long[] sortedClusterSize = new long[k];
+    int[] oldToNewIndexMap = new int[k];
+
+    for (int i = 0; i < k; i++) {
+      KClusterAlgorithm.MedoidSortHelper sortedItem = sorters.get(i);
+      sortedMedoids[i] = sortedItem.medoid;
+      sortedClusterSize[i] = sortedItem.size;
+      oldToNewIndexMap[sortedItem.originalIndex] = i;
+    }
+
+    int[] sortedClusterAssignment = new int[clusterAssignment.length];
+    for (int i = 0; i < clusterAssignment.length; i++) {
+      int oldIndex = clusterAssignment[i];
+      sortedClusterAssignment[i] = oldToNewIndexMap[oldIndex];
+    }
+
+    return new Object[] {sortedMedoids, sortedClusterAssignment, sortedClusterSize};
+  }
+
+  // --- Cost Calculation Functions ---
+
+  private static long bitLengthCost(long value) {
+    if (value == 0) return 1;
+    return 64 - Long.numberOfLeadingZeros(value);
+  }
+
+  private static long calculateResidualCost(long p1, long p2) {
+    return 1 + bitLengthCost(Math.abs(p1 - p2));
+  }
+
+  private static int binarySearch(long[] prefixSums, long value) {
+    int low = 0;
+    int high = prefixSums.length - 1;
+    int ans = -1;
+    while (low <= high) {
+      int mid = low + (high - low) / 2;
+      if (prefixSums[mid] >= value) {
+        ans = mid;
+        high = mid - 1;
+      } else {
+        low = mid + 1;
+      }
+    }
+    return ans == -1 ? prefixSums.length - 1 : ans;
+  }
 }
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/KClusterEncoder.java b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/KClusterEncoder.java
index c24e90a..c8b347b 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/KClusterEncoder.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/KClusterEncoder.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.tsfile.encoding.encoder;
 
 import org.apache.tsfile.enums.TSDataType;
@@ -9,373 +28,417 @@
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 
-public class KClusterEncoder extends Encoder{
+public class KClusterEncoder extends Encoder {
 
-    private static final int DEFAULT_PACK_SIZE = 10;
-    private final int k;
-    private long[] scaledData;
-    private int scalingExponent;
-    private final ValueBuffer buffer;
+  private static final int DEFAULT_PACK_SIZE = 10;
+  private final int k;
+  private long[] scaledData;
+  private int scalingExponent;
+  private final ValueBuffer buffer;
 
-    // MODIFIED: Constructor takes 'k'
-    public KClusterEncoder(TSDataType dataType, int k) {
-        super(TSEncoding.KCLUSTER);
-        this.k = k;
-        switch (dataType) {
-            case INT32: case INT64:
-                this.buffer = new Int64Buffer();
-                break;
-            case FLOAT: case DOUBLE:
-                this.buffer = new DoubleBuffer();
-                break;
-            default:
-                throw new TsFileEncodingException("AClusterEncoder does not support data type: " + dataType);
-        }
+  // MODIFIED: Constructor takes 'k'
+  public KClusterEncoder(TSDataType dataType, int k) {
+    super(TSEncoding.KCLUSTER);
+    this.k = k;
+    switch (dataType) {
+      case INT32:
+      case INT64:
+        this.buffer = new Int64Buffer();
+        break;
+      case FLOAT:
+      case DOUBLE:
+        this.buffer = new DoubleBuffer();
+        break;
+      default:
+        throw new TsFileEncodingException(
+            "AClusterEncoder does not support data type: " + dataType);
     }
+  }
 
+  /** A buffer to store all values of a page before flushing. We use long to unify all types. */
+  private interface ValueBuffer {
+    /** Adds a value to the buffer. */
+    void add(int value);
 
-    /** A buffer to store all values of a page before flushing. We use long to unify all types. */
-    private interface ValueBuffer {
-        /** Adds a value to the buffer. */
-        void add(int value);
-        void add(long value);
-        void add(float value);
-        void add(double value);
+    void add(long value);
 
-        /** Clears the internal buffer. */
-        void clear();
+    void add(float value);
 
-        /** Checks if the buffer is empty. */
-        boolean isEmpty();
+    void add(double value);
 
-        /** Gets the current size of the buffer. */
-        int size();
+    /** Clears the internal buffer. */
+    void clear();
 
-        KClusterEncoder.ProcessingResult processAndGet();
-    }
+    /** Checks if the buffer is empty. */
+    boolean isEmpty();
 
-    private static class Int64Buffer implements KClusterEncoder.ValueBuffer {
-        private final List<Long> values = new ArrayList<>();
+    /** Gets the current size of the buffer. */
+    int size();
 
-        @Override
-        public void add(int value) { values.add((long) value); }
-        @Override
-        public void add(long value) { values.add(value); }
-        @Override
-        public void add(float value) { /* Do nothing, type mismatch */ }
-        @Override
-        public void add(double value) { /* Do nothing, type mismatch */ }
+    KClusterEncoder.ProcessingResult processAndGet();
+  }
 
-        @Override
-        public KClusterEncoder.ProcessingResult processAndGet() { // <--- 实现新方法
-            long[] data = values.stream().mapToLong(l -> l).toArray();
-            return new KClusterEncoder.ProcessingResult(data, 0); // Exponent is 0 for integers
-        }
-
-        @Override
-        public void clear() { values.clear(); }
-        @Override
-        public boolean isEmpty() { return values.isEmpty(); }
-        @Override
-        public int size() { return values.size(); }
-    }
-
-    /** A buffer for FLOAT and DOUBLE types. It performs scaling in processAndGetLongs(). */
-    private static class DoubleBuffer implements KClusterEncoder.ValueBuffer {
-        private final List<Double> values = new ArrayList<>();
-
-        @Override
-        public void add(int value) { /* Do nothing, type mismatch */ }
-        @Override
-        public void add(long value) { /* Do nothing, type mismatch */ }
-        @Override
-        public void add(float value) { values.add((double) value); } // Store as double to unify
-        @Override
-        public void add(double value) { values.add(value); }
-
-        @Override
-        public KClusterEncoder.ProcessingResult processAndGet() {
-            // --- Edge Case: Handle empty buffer ---
-            if (values.isEmpty()) {
-                return new KClusterEncoder.ProcessingResult(new long[0], 0);
-            }
-            int maxDecimalPlaces = 0;
-            for (double v : values) {
-                String s = BigDecimal.valueOf(v).toPlainString();
-                int dotIndex = s.indexOf('.');
-                if (dotIndex != -1) {
-                    int decimalPlaces = s.length() - dotIndex - 1;
-                    if (decimalPlaces > maxDecimalPlaces) {
-                        maxDecimalPlaces = decimalPlaces;
-                    }
-                }
-            }
-
-            double scalingFactor = Math.pow(10, maxDecimalPlaces);
-
-            long[] scaledLongs = new long[values.size()];
-            for (int i = 0; i < values.size(); i++) {
-                scaledLongs[i] = Math.round(values.get(i) * scalingFactor);
-            }
-
-            return new KClusterEncoder.ProcessingResult(scaledLongs, maxDecimalPlaces);
-        }
-
-        @Override
-        public void clear() { values.clear(); }
-        @Override
-        public boolean isEmpty() { return values.isEmpty(); }
-        @Override
-        public int size() { return values.size(); }
-    }
-
-    private static class ProcessingResult {
-        final long[] scaledLongs;
-        final int scalingExponent; // e.g., 3 for a scaling factor of 1000
-
-        ProcessingResult(long[] scaledLongs, int scalingExponent) {
-            this.scaledLongs = scaledLongs;
-            this.scalingExponent = scalingExponent;
-        }
-
-        long[] getScaledLongs(){
-            return this.scaledLongs;
-        }
-
-        int getScalingExponent(){
-            return this.scalingExponent;
-        }
-    }
-
+  private static class Int64Buffer implements KClusterEncoder.ValueBuffer {
+    private final List<Long> values = new ArrayList<>();
 
     @Override
-    public void flush(ByteArrayOutputStream out) throws IOException {
-        if (buffer.isEmpty()) {
-            return;
-        }
-
-        ProcessingResult procResult = buffer.processAndGet();
-        long[] originalData = procResult.getScaledLongs();
-        int scalingExponent = procResult.getScalingExponent();
-        if (originalData.length == 0) return;
-        long minVal = findMin(originalData);
-        long[] data = new long[originalData.length];
-        for (int i = 0; i < data.length; i++) {
-            data[i] = originalData[i] - minVal;
-        }
-
-        Object[] clusterResult = KClusterAlgorithm.run(data,k);
-        long[] sortedMedoids = (long[]) clusterResult[0];
-        int[] clusterAssignments = (int[]) clusterResult[1];
-        long[] clusterFrequencies = (long[]) clusterResult[2];
-
-        long[] sortedZigzagResiduals = calculateSortedZigzagResiduals(data, sortedMedoids, clusterAssignments, clusterFrequencies);
-
-        encodeResults(out, scalingExponent, minVal, sortedMedoids, clusterAssignments, sortedZigzagResiduals);
-
-        buffer.clear();
-    }
-
-
-    @Override
-    public void encode(int value, ByteArrayOutputStream out) {
-        buffer.add(value);
+    public void add(int value) {
+      values.add((long) value);
     }
 
     @Override
-    public void encode(long value, ByteArrayOutputStream out) {
-        buffer.add(value);
+    public void add(long value) {
+      values.add(value);
     }
 
     @Override
-    public void encode(float value, ByteArrayOutputStream out) {
-        buffer.add(value);
+    public void add(float value) {
+      /* Do nothing, type mismatch */
     }
 
     @Override
-    public void encode(double value, ByteArrayOutputStream out) {
-        buffer.add(value);
-    }
-
-    private long[] calculateSortedZigzagResiduals(long[] data, long[] medoids, int[] assignments, long[] frequencies) {
-        int n = data.length;
-        int k = medoids.length;
-        if (n == 0) return new long[0];
-
-        long[] sortedResiduals = new long[n];
-        int[] writePointers = new int[k];
-        int cumulativeCount = 0;
-        for (int i = 0; i < k; i++) {
-            writePointers[i] = cumulativeCount;
-            cumulativeCount += (int) frequencies[i];
-        }
-
-        for (int i = 0; i < n; i++) {
-            int clusterId = assignments[i];
-            long medoid = medoids[clusterId];
-            long residual = data[i] - medoid;
-            long zigzagResidual = (residual << 1) ^ (residual >> 63); // Zigzag Encoding
-
-            int targetIndex = writePointers[clusterId];
-            sortedResiduals[targetIndex] = zigzagResidual;
-            writePointers[clusterId]++;
-        }
-        return sortedResiduals;
-    }
-
-    private void encodeResults(
-            ByteArrayOutputStream out,
-            int scalingExponent,
-            long minVal,
-            long[] medoids,
-            int[] frequencies,
-            long[] residuals)
-            throws IOException {
-
-        ClusterSupport writer = new ClusterSupport(out);
-        int numPoints = frequencies.length;
-        int k = medoids.length;
-
-        writer.write(scalingExponent, 8);
-        writer.write(k, 16);
-        writer.write(numPoints, 16);
-        writer.write(DEFAULT_PACK_SIZE, 16);
-
-        int minValBit = ClusterSupport.bitsRequired(minVal);
-        writer.write(minValBit, 8);
-        writer.write(minVal>=0?0:1,1);
-        writer.write(minVal, minValBit);
-
-        if (k == 0) {
-            writer.flush();
-            return;
-        }
-
-        long minMedoid = findMin(medoids);
-        long[] medoidOffsets = new long[k];
-        int maxMedoidOffsetBits = 0;
-        for (int i = 0; i < k; i++) {
-            medoidOffsets[i] = medoids[i] - minMedoid;
-            maxMedoidOffsetBits = Math.max(maxMedoidOffsetBits, ClusterSupport.bitsRequired(medoidOffsets[i]));
-        }
-        int minMedoidBit = ClusterSupport.bitsRequired(minMedoid);
-        writer.write(minMedoidBit,8);
-        writer.write(minMedoid>0?0:1,1);
-        writer.write(minMedoid, minMedoidBit);
-
-        writer.write(maxMedoidOffsetBits, 8);
-        for (long offset : medoidOffsets) {
-            writer.write(offset, maxMedoidOffsetBits);
-        }
-
-        long[] freqDeltas = new long[k];
-        if (k > 0) {
-            freqDeltas[0] = frequencies[0];
-            for (int i = 1; i < k; i++) {
-                freqDeltas[i] = frequencies[i] - frequencies[i - 1];
-            }
-        }
-        int numFreqBlocks = (k + DEFAULT_PACK_SIZE - 1) / DEFAULT_PACK_SIZE;
-        writer.write(numFreqBlocks, 16);
-
-        int[] freqBlockMaxBits = new int[numFreqBlocks];
-        // Metadata pass for frequencies
-        for (int i = 0; i < numFreqBlocks; i++) {
-            int start = i * DEFAULT_PACK_SIZE;
-            int end = Math.min(start + DEFAULT_PACK_SIZE, k);
-            long maxDelta = 0;
-            for (int j = start; j < end; j++) {
-                maxDelta = Math.max(maxDelta, freqDeltas[j]);
-            }
-            freqBlockMaxBits[i] = ClusterSupport.bitsRequired(maxDelta);
-            writer.write(freqBlockMaxBits[i], 8);
-        }
-        // Data pass for frequencies
-        for (int i = 0; i < numFreqBlocks; i++) {
-            int start = i * DEFAULT_PACK_SIZE;
-            int end = Math.min(start + DEFAULT_PACK_SIZE, k);
-            int bitsForBlock = freqBlockMaxBits[i];
-            for (int j = start; j < end; j++) {
-                writer.write(freqDeltas[j], bitsForBlock);
-            }
-        }
-
-        int numPacks = (numPoints + DEFAULT_PACK_SIZE - 1) / DEFAULT_PACK_SIZE;
-        writer.write(numPacks, 32);
-
-        int[] resPackMaxBits = new int[numPacks];
-        // Metadata pass for residuals
-        for (int i = 0; i < numPacks; i++) {
-            int start = i * DEFAULT_PACK_SIZE;
-            int end = Math.min(start + DEFAULT_PACK_SIZE, numPoints);
-            long maxOffset = 0;
-            for (int j = start; j < end; j++) {
-                maxOffset = Math.max(maxOffset, residuals[j]);
-            }
-            resPackMaxBits[i] = ClusterSupport.bitsRequired(maxOffset);
-            writer.write(resPackMaxBits[i], 8);
-        }
-        // Data pass for residuals
-        for (int i = 0; i < numPacks; i++) {
-            int start = i * DEFAULT_PACK_SIZE;
-            int end = Math.min(start + DEFAULT_PACK_SIZE, numPoints);
-            int bitsForPack = resPackMaxBits[i];
-            if (bitsForPack > 0) {
-                for (int j = start; j < end; j++) {
-                    writer.write(residuals[j], bitsForPack);
-                }
-            }
-        }
-
-        writer.flush();
-    }
-
-    private long findMin(long[] data) {
-        if (data == null || data.length == 0) {
-            throw new IllegalArgumentException("Data array cannot be null or empty.");
-        }
-        long min = data[0];
-        for (int i = 1; i < data.length; i++) {
-            if (data[i] < min) {
-                min = data[i];
-            }
-        }
-        return min;
+    public void add(double value) {
+      /* Do nothing, type mismatch */
     }
 
     @Override
-    public int getOneItemMaxSize() {
-        return 8;
+    public KClusterEncoder.ProcessingResult processAndGet() { // <--- 实现新方法
+      long[] data = values.stream().mapToLong(l -> l).toArray();
+      return new KClusterEncoder.ProcessingResult(data, 0); // Exponent is 0 for integers
     }
 
     @Override
-    public long getMaxByteSize() {
-        if (this.buffer.isEmpty()) {
-            return 0;
+    public void clear() {
+      values.clear();
+    }
+
+    @Override
+    public boolean isEmpty() {
+      return values.isEmpty();
+    }
+
+    @Override
+    public int size() {
+      return values.size();
+    }
+  }
+
+  /** A buffer for FLOAT and DOUBLE types. It performs scaling in processAndGetLongs(). */
+  private static class DoubleBuffer implements KClusterEncoder.ValueBuffer {
+    private final List<Double> values = new ArrayList<>();
+
+    @Override
+    public void add(int value) {
+      /* Do nothing, type mismatch */
+    }
+
+    @Override
+    public void add(long value) {
+      /* Do nothing, type mismatch */
+    }
+
+    @Override
+    public void add(float value) {
+      values.add((double) value);
+    } // Store as double to unify
+
+    @Override
+    public void add(double value) {
+      values.add(value);
+    }
+
+    @Override
+    public KClusterEncoder.ProcessingResult processAndGet() {
+      // --- Edge Case: Handle empty buffer ---
+      if (values.isEmpty()) {
+        return new KClusterEncoder.ProcessingResult(new long[0], 0);
+      }
+      int maxDecimalPlaces = 0;
+      for (double v : values) {
+        String s = BigDecimal.valueOf(v).toPlainString();
+        int dotIndex = s.indexOf('.');
+        if (dotIndex != -1) {
+          int decimalPlaces = s.length() - dotIndex - 1;
+          if (decimalPlaces > maxDecimalPlaces) {
+            maxDecimalPlaces = decimalPlaces;
+          }
         }
-        return (long) this.buffer.size() * getOneItemMaxSize() * 3 / 2;
+      }
+
+      double scalingFactor = Math.pow(10, maxDecimalPlaces);
+
+      long[] scaledLongs = new long[values.size()];
+      for (int i = 0; i < values.size(); i++) {
+        scaledLongs[i] = Math.round(values.get(i) * scalingFactor);
+      }
+
+      return new KClusterEncoder.ProcessingResult(scaledLongs, maxDecimalPlaces);
     }
 
     @Override
-    public void encode(boolean value, ByteArrayOutputStream out) {
-        throw new TsFileEncodingException("AClusterEncoder does not support boolean values.");
+    public void clear() {
+      values.clear();
     }
 
     @Override
-    public void encode(short value, ByteArrayOutputStream out) {
-        throw new TsFileEncodingException("AClusterEncoder does not support short values.");
+    public boolean isEmpty() {
+      return values.isEmpty();
     }
 
     @Override
-    public void encode(Binary value, ByteArrayOutputStream out) {
-        throw new TsFileEncodingException("AClusterEncoder does not support Binary values.");
+    public int size() {
+      return values.size();
+    }
+  }
+
+  private static class ProcessingResult {
+    final long[] scaledLongs;
+    final int scalingExponent; // e.g., 3 for a scaling factor of 1000
+
+    ProcessingResult(long[] scaledLongs, int scalingExponent) {
+      this.scaledLongs = scaledLongs;
+      this.scalingExponent = scalingExponent;
     }
 
-    @Override
-    public void encode(BigDecimal value, ByteArrayOutputStream out) {
-        throw new TsFileEncodingException("AClusterEncoder does not support BigDecimal values.");
+    long[] getScaledLongs() {
+      return this.scaledLongs;
     }
+
+    int getScalingExponent() {
+      return this.scalingExponent;
+    }
+  }
+
+  @Override
+  public void flush(ByteArrayOutputStream out) throws IOException {
+    if (buffer.isEmpty()) {
+      return;
+    }
+
+    ProcessingResult procResult = buffer.processAndGet();
+    long[] originalData = procResult.getScaledLongs();
+    int scalingExponent = procResult.getScalingExponent();
+    if (originalData.length == 0) return;
+    long minVal = findMin(originalData);
+    long[] data = new long[originalData.length];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = originalData[i] - minVal;
+    }
+
+    Object[] clusterResult = KClusterAlgorithm.run(data, k);
+    long[] sortedMedoids = (long[]) clusterResult[0];
+    int[] clusterAssignments = (int[]) clusterResult[1];
+    long[] clusterFrequencies = (long[]) clusterResult[2];
+
+    long[] sortedZigzagResiduals =
+        calculateSortedZigzagResiduals(data, sortedMedoids, clusterAssignments, clusterFrequencies);
+
+    encodeResults(
+        out, scalingExponent, minVal, sortedMedoids, clusterAssignments, sortedZigzagResiduals);
+
+    buffer.clear();
+  }
+
+  @Override
+  public void encode(int value, ByteArrayOutputStream out) {
+    buffer.add(value);
+  }
+
+  @Override
+  public void encode(long value, ByteArrayOutputStream out) {
+    buffer.add(value);
+  }
+
+  @Override
+  public void encode(float value, ByteArrayOutputStream out) {
+    buffer.add(value);
+  }
+
+  @Override
+  public void encode(double value, ByteArrayOutputStream out) {
+    buffer.add(value);
+  }
+
+  private long[] calculateSortedZigzagResiduals(
+      long[] data, long[] medoids, int[] assignments, long[] frequencies) {
+    int n = data.length;
+    int k = medoids.length;
+    if (n == 0) return new long[0];
+
+    long[] sortedResiduals = new long[n];
+    int[] writePointers = new int[k];
+    int cumulativeCount = 0;
+    for (int i = 0; i < k; i++) {
+      writePointers[i] = cumulativeCount;
+      cumulativeCount += (int) frequencies[i];
+    }
+
+    for (int i = 0; i < n; i++) {
+      int clusterId = assignments[i];
+      long medoid = medoids[clusterId];
+      long residual = data[i] - medoid;
+      long zigzagResidual = (residual << 1) ^ (residual >> 63); // Zigzag Encoding
+
+      int targetIndex = writePointers[clusterId];
+      sortedResiduals[targetIndex] = zigzagResidual;
+      writePointers[clusterId]++;
+    }
+    return sortedResiduals;
+  }
+
+  private void encodeResults(
+      ByteArrayOutputStream out,
+      int scalingExponent,
+      long minVal,
+      long[] medoids,
+      int[] frequencies,
+      long[] residuals)
+      throws IOException {
+
+    ClusterSupport writer = new ClusterSupport(out);
+    int numPoints = frequencies.length;
+    int k = medoids.length;
+
+    writer.write(scalingExponent, 8);
+    writer.write(k, 16);
+    writer.write(numPoints, 16);
+    writer.write(DEFAULT_PACK_SIZE, 16);
+
+    int minValBit = ClusterSupport.bitsRequired(minVal);
+    writer.write(minValBit, 8);
+    writer.write(minVal >= 0 ? 0 : 1, 1);
+    writer.write(minVal, minValBit);
+
+    if (k == 0) {
+      writer.flush();
+      return;
+    }
+
+    long minMedoid = findMin(medoids);
+    long[] medoidOffsets = new long[k];
+    int maxMedoidOffsetBits = 0;
+    for (int i = 0; i < k; i++) {
+      medoidOffsets[i] = medoids[i] - minMedoid;
+      maxMedoidOffsetBits =
+          Math.max(maxMedoidOffsetBits, ClusterSupport.bitsRequired(medoidOffsets[i]));
+    }
+    int minMedoidBit = ClusterSupport.bitsRequired(minMedoid);
+    writer.write(minMedoidBit, 8);
+    writer.write(minMedoid > 0 ? 0 : 1, 1);
+    writer.write(minMedoid, minMedoidBit);
+
+    writer.write(maxMedoidOffsetBits, 8);
+    for (long offset : medoidOffsets) {
+      writer.write(offset, maxMedoidOffsetBits);
+    }
+
+    long[] freqDeltas = new long[k];
+    if (k > 0) {
+      freqDeltas[0] = frequencies[0];
+      for (int i = 1; i < k; i++) {
+        freqDeltas[i] = frequencies[i] - frequencies[i - 1];
+      }
+    }
+    int numFreqBlocks = (k + DEFAULT_PACK_SIZE - 1) / DEFAULT_PACK_SIZE;
+    writer.write(numFreqBlocks, 16);
+
+    int[] freqBlockMaxBits = new int[numFreqBlocks];
+    // Metadata pass for frequencies
+    for (int i = 0; i < numFreqBlocks; i++) {
+      int start = i * DEFAULT_PACK_SIZE;
+      int end = Math.min(start + DEFAULT_PACK_SIZE, k);
+      long maxDelta = 0;
+      for (int j = start; j < end; j++) {
+        maxDelta = Math.max(maxDelta, freqDeltas[j]);
+      }
+      freqBlockMaxBits[i] = ClusterSupport.bitsRequired(maxDelta);
+      writer.write(freqBlockMaxBits[i], 8);
+    }
+    // Data pass for frequencies
+    for (int i = 0; i < numFreqBlocks; i++) {
+      int start = i * DEFAULT_PACK_SIZE;
+      int end = Math.min(start + DEFAULT_PACK_SIZE, k);
+      int bitsForBlock = freqBlockMaxBits[i];
+      for (int j = start; j < end; j++) {
+        writer.write(freqDeltas[j], bitsForBlock);
+      }
+    }
+
+    int numPacks = (numPoints + DEFAULT_PACK_SIZE - 1) / DEFAULT_PACK_SIZE;
+    writer.write(numPacks, 32);
+
+    int[] resPackMaxBits = new int[numPacks];
+    // Metadata pass for residuals
+    for (int i = 0; i < numPacks; i++) {
+      int start = i * DEFAULT_PACK_SIZE;
+      int end = Math.min(start + DEFAULT_PACK_SIZE, numPoints);
+      long maxOffset = 0;
+      for (int j = start; j < end; j++) {
+        maxOffset = Math.max(maxOffset, residuals[j]);
+      }
+      resPackMaxBits[i] = ClusterSupport.bitsRequired(maxOffset);
+      writer.write(resPackMaxBits[i], 8);
+    }
+    // Data pass for residuals
+    for (int i = 0; i < numPacks; i++) {
+      int start = i * DEFAULT_PACK_SIZE;
+      int end = Math.min(start + DEFAULT_PACK_SIZE, numPoints);
+      int bitsForPack = resPackMaxBits[i];
+      if (bitsForPack > 0) {
+        for (int j = start; j < end; j++) {
+          writer.write(residuals[j], bitsForPack);
+        }
+      }
+    }
+
+    writer.flush();
+  }
+
+  private long findMin(long[] data) {
+    if (data == null || data.length == 0) {
+      throw new IllegalArgumentException("Data array cannot be null or empty.");
+    }
+    long min = data[0];
+    for (int i = 1; i < data.length; i++) {
+      if (data[i] < min) {
+        min = data[i];
+      }
+    }
+    return min;
+  }
+
+  @Override
+  public int getOneItemMaxSize() {
+    return 8;
+  }
+
+  @Override
+  public long getMaxByteSize() {
+    if (this.buffer.isEmpty()) {
+      return 0;
+    }
+    return (long) this.buffer.size() * getOneItemMaxSize() * 3 / 2;
+  }
+
+  @Override
+  public void encode(boolean value, ByteArrayOutputStream out) {
+    throw new TsFileEncodingException("AClusterEncoder does not support boolean values.");
+  }
+
+  @Override
+  public void encode(short value, ByteArrayOutputStream out) {
+    throw new TsFileEncodingException("AClusterEncoder does not support short values.");
+  }
+
+  @Override
+  public void encode(Binary value, ByteArrayOutputStream out) {
+    throw new TsFileEncodingException("AClusterEncoder does not support Binary values.");
+  }
+
+  @Override
+  public void encode(BigDecimal value, ByteArrayOutputStream out) {
+    throw new TsFileEncodingException("AClusterEncoder does not support BigDecimal values.");
+  }
 }
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/TSEncodingBuilder.java b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/TSEncodingBuilder.java
index 4a25c88..5bcb401 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/TSEncodingBuilder.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/TSEncodingBuilder.java
@@ -214,9 +214,7 @@
     }
 
     @Override
-    public void initFromProps(Map<String, String> props) {
-
-    }
+    public void initFromProps(Map<String, String> props) {}
   }
 
   public static class KCluster extends TSEncodingBuilder {
@@ -255,12 +253,12 @@
           int parsedK = Integer.parseInt(kStr);
           if (parsedK <= 0) {
             throw new IllegalArgumentException(
-                    "KCLUSTER parameter k must be a positive integer, but was " + parsedK);
+                "KCLUSTER parameter k must be a positive integer, but was " + parsedK);
           }
           this.k = parsedK;
         } catch (NumberFormatException e) {
           throw new IllegalArgumentException(
-                  "KCLUSTER parameter k must be an integer, but was " + kStr);
+              "KCLUSTER parameter k must be an integer, but was " + kStr);
         }
       }
     }
diff --git a/java/tsfile/src/test/java/org/apache/tsfile/encoding/decoder/AClusterEncoderDecoderTest.java b/java/tsfile/src/test/java/org/apache/tsfile/encoding/decoder/AClusterEncoderDecoderTest.java
index fcc4dca..e1b6ea1 100644
--- a/java/tsfile/src/test/java/org/apache/tsfile/encoding/decoder/AClusterEncoderDecoderTest.java
+++ b/java/tsfile/src/test/java/org/apache/tsfile/encoding/decoder/AClusterEncoderDecoderTest.java
@@ -1,8 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.tsfile.encoding.decoder;
 
 import org.apache.tsfile.encoding.encoder.AClusterEncoder;
 import org.apache.tsfile.encoding.encoder.Encoder;
 import org.apache.tsfile.enums.TSDataType;
+
 import org.junit.Test;
 
 import java.io.ByteArrayOutputStream;
@@ -10,7 +30,11 @@
 import java.math.BigDecimal;
 import java.math.RoundingMode;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -18,193 +42,195 @@
 
 /**
  * Test suite for AClusterEncoder and AClusterDecoder.
- * <p>
- * This test validates the end-to-end encoding and decoding process.
- * Since ACluster algorithm reorders data based on clusters, the validation
- * cannot compare original and decoded lists element by element. Instead, it
- * verifies that the set of unique values and their frequencies are identical
- * before and after the process.
- * </p>
- * <p>
- * The test structure is adapted to the iterator-style Decoder interface
- * (hasNext(buffer), readXXX(buffer)).
- * </p>
+ *
+ * <p>This test validates the end-to-end encoding and decoding process. Since ACluster algorithm
+ * reorders data based on clusters, the validation cannot compare original and decoded lists element
+ * by element. Instead, it verifies that the set of unique values and their frequencies are
+ * identical before and after the process.
+ *
+ * <p>The test structure is adapted to the iterator-style Decoder interface (hasNext(buffer),
+ * readXXX(buffer)).
  */
 public class AClusterEncoderDecoderTest {
 
-    private static final int ROW_NUM = 1000;
-    private final Random ran = new Random();
+  private static final int ROW_NUM = 1000;
+  private final Random ran = new Random();
 
-    // =================================================================================
-    // Integer Tests
-    // =================================================================================
+  // =================================================================================
+  // Integer Tests
+  // =================================================================================
 
-    @Test
-    public void testIntBasicClusters() throws IOException {
-        List<Integer> data = new ArrayList<>();
-        // Three distinct clusters
-        for (int i = 0; i < 300; i++) data.add(100 + ran.nextInt(10)); // Cluster around 100
-        for (int i = 0; i < 400; i++) data.add(5000 + ran.nextInt(20)); // Cluster around 5000
-        for (int i = 0; i < 300; i++) data.add(100000 + ran.nextInt(5)); // Cluster around 100000
-        shouldReadAndWrite(data, TSDataType.INT32);
+  @Test
+  public void testIntBasicClusters() throws IOException {
+    List<Integer> data = new ArrayList<>();
+    // Three distinct clusters
+    for (int i = 0; i < 300; i++) data.add(100 + ran.nextInt(10)); // Cluster around 100
+    for (int i = 0; i < 400; i++) data.add(5000 + ran.nextInt(20)); // Cluster around 5000
+    for (int i = 0; i < 300; i++) data.add(100000 + ran.nextInt(5)); // Cluster around 100000
+    shouldReadAndWrite(data, TSDataType.INT32);
+  }
+
+  // =================================================================================
+  // Long Tests
+  // =================================================================================
+
+  @Test
+  public void testLongBasic() throws IOException {
+    List<Long> data = new ArrayList<>();
+    for (int i = 0; i < ROW_NUM; i++) {
+      data.add((long) i * i * i);
+    }
+    shouldReadAndWrite(data, TSDataType.INT64);
+  }
+
+  // =================================================================================
+  // Double Tests
+  // =================================================================================
+
+  @Test
+  public void testDoubleWithPrecision() throws IOException {
+    List<Double> data = new ArrayList<>();
+    final int precision = 6;
+
+    System.out.println(
+        "Testing double with controlled precision (max " + precision + " decimal places)...");
+
+    for (int i = 0; i < ROW_NUM / 2; i++) {
+      double randomPart = nextRandomDoubleWithPrecision(ran, precision);
+      double rawValue = 123.456 + randomPart;
+
+      double cleanValue = cleanDouble(rawValue, precision + 3);
+      data.add(cleanValue);
     }
 
+    for (int i = 0; i < ROW_NUM / 2; i++) {
+      double randomPart = nextRandomDoubleWithPrecision(ran, precision);
+      double rawValue = 9999.0 + randomPart;
 
-    // =================================================================================
-    // Long Tests
-    // =================================================================================
-
-    @Test
-    public void testLongBasic() throws IOException {
-        List<Long> data = new ArrayList<>();
-        for (int i = 0; i < ROW_NUM; i++) {
-            data.add((long) i * i * i);
-        }
-        shouldReadAndWrite(data, TSDataType.INT64);
+      double cleanValue = cleanDouble(rawValue, precision);
+      data.add(cleanValue);
     }
 
-    // =================================================================================
-    // Double Tests
-    // =================================================================================
-
-    @Test
-    public void testDoubleWithPrecision() throws IOException {
-        List<Double> data = new ArrayList<>();
-        final int precision = 6;
-
-        System.out.println("Testing double with controlled precision (max " + precision + " decimal places)...");
-
-        for (int i = 0; i < ROW_NUM / 2; i++) {
-            double randomPart = nextRandomDoubleWithPrecision(ran, precision);
-            double rawValue = 123.456 + randomPart;
-
-            double cleanValue = cleanDouble(rawValue, precision + 3);
-            data.add(cleanValue);
-        }
-
-        for (int i = 0; i < ROW_NUM / 2; i++) {
-            double randomPart = nextRandomDoubleWithPrecision(ran, precision);
-            double rawValue = 9999.0 + randomPart;
-
-            double cleanValue = cleanDouble(rawValue, precision);
-            data.add(cleanValue);
-        }
-
-        if (!data.isEmpty()) {
-            System.out.println("Sample generated data point (after cleaning): " + data.get(0));
-        }
-
-        shouldReadAndWrite(data, TSDataType.DOUBLE);
+    if (!data.isEmpty()) {
+      System.out.println("Sample generated data point (after cleaning): " + data.get(0));
     }
 
-    private double cleanDouble(double value, int maxPrecision) {
-        BigDecimal bd = new BigDecimal(value);
-        BigDecimal roundedBd = bd.setScale(maxPrecision, RoundingMode.HALF_UP);
-        return roundedBd.doubleValue();
-    }
-    // =================================================================================
-    // Edge Case Tests
-    // =================================================================================
+    shouldReadAndWrite(data, TSDataType.DOUBLE);
+  }
 
-    @Test
-    public void testSingleValue() throws IOException {
-        shouldReadAndWrite(Arrays.asList(123.00000001,123.00000002, 29.0001,29.0002,29.000001), TSDataType.DOUBLE);
+  private double cleanDouble(double value, int maxPrecision) {
+    BigDecimal bd = new BigDecimal(value);
+    BigDecimal roundedBd = bd.setScale(maxPrecision, RoundingMode.HALF_UP);
+    return roundedBd.doubleValue();
+  }
+
+  // =================================================================================
+  // Edge Case Tests
+  // =================================================================================
+
+  @Test
+  public void testSingleValue() throws IOException {
+    shouldReadAndWrite(
+        Arrays.asList(123.00000001, 123.00000002, 29.0001, 29.0002, 29.000001), TSDataType.DOUBLE);
+  }
+
+  @Test
+  public void testAllSameValues() throws IOException {
+    List<Integer> data = new ArrayList<>();
+    for (int i = 0; i < 100; i++) data.add(777);
+    shouldReadAndWrite(data, TSDataType.INT32);
+  }
+
+  // =================================================================================
+  // Core Test Logic and Helpers
+  // =================================================================================
+
+  private double nextRandomDoubleWithPrecision(Random random, int precision) {
+    if (precision < 0) {
+      throw new IllegalArgumentException("Precision must be non-negative.");
+    }
+    double factor = Math.pow(10, precision);
+
+    double scaled = random.nextDouble() * factor;
+    long rounded = Math.round(scaled);
+    return rounded / factor;
+  }
+
+  /** Generic helper to write a list of data using the appropriate encoder method. */
+  private <T extends Number> void writeData(
+      List<T> data, Encoder encoder, ByteArrayOutputStream out) throws IOException {
+    if (data.isEmpty()) {
+      return;
+    }
+    // Use instanceof to call the correct overloaded encode method
+    if (data.get(0) instanceof Integer) {
+      data.forEach(val -> encoder.encode((Integer) val, out));
+    } else if (data.get(0) instanceof Long) {
+      data.forEach(val -> encoder.encode((Long) val, out));
+    } else if (data.get(0) instanceof Float) {
+      data.forEach(val -> encoder.encode((Float) val, out));
+    } else if (data.get(0) instanceof Double) {
+      data.forEach(val -> encoder.encode((Double) val, out));
+    }
+    encoder.flush(out);
+  }
+
+  /**
+   * The main validation method. It encodes the given data, then decodes it, and finally compares
+   * the frequency maps of the original and decoded data.
+   */
+  private <T extends Number> void shouldReadAndWrite(List<T> originalData, TSDataType dataType)
+      throws IOException {
+    // 1. Prepare for encoding
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    Encoder encoder = new AClusterEncoder(dataType);
+
+    // 2. Encode the data
+    writeData(originalData, encoder, out);
+    ByteBuffer buffer = ByteBuffer.wrap(out.toByteArray());
+
+    // 3. Decode the data using the iterator-style interface
+    Decoder decoder = new ClusterDecoder(dataType);
+    List<T> decodedData = new ArrayList<>();
+
+    while (decoder.hasNext(buffer)) {
+      switch (dataType) {
+        case INT32:
+          decodedData.add((T) Integer.valueOf(decoder.readInt(buffer)));
+          break;
+        case INT64:
+          decodedData.add((T) Long.valueOf(decoder.readLong(buffer)));
+          break;
+        case FLOAT:
+          decodedData.add((T) Float.valueOf(decoder.readFloat(buffer)));
+          break;
+        case DOUBLE:
+          decodedData.add((T) Double.valueOf(decoder.readDouble(buffer)));
+          break;
+        default:
+          throw new UnsupportedOperationException("Unsupported data type for test");
+      }
     }
 
-    @Test
-    public void testAllSameValues() throws IOException {
-        List<Integer> data = new ArrayList<>();
-        for(int i = 0; i < 100; i++) data.add(777);
-        shouldReadAndWrite(data, TSDataType.INT32);
-    }
+    // 4. Validate the results
+    // First, a quick check on the total count
+    assertEquals(
+        "Decoded data size should match original data size",
+        originalData.size(),
+        decodedData.size());
 
-    // =================================================================================
-    // Core Test Logic and Helpers
-    // =================================================================================
+    // Second, the robust check using frequency maps
+    Map<T, Long> originalFrequencies = getFrequencyMap(originalData);
+    Map<T, Long> decodedFrequencies = getFrequencyMap(decodedData);
 
-    private double nextRandomDoubleWithPrecision(Random random, int precision) {
-        if (precision < 0) {
-            throw new IllegalArgumentException("Precision must be non-negative.");
-        }
-        double factor = Math.pow(10, precision);
+    assertEquals(
+        "Frequency maps of original and decoded data should be identical",
+        originalFrequencies,
+        decodedFrequencies);
+  }
 
-        double scaled = random.nextDouble() * factor;
-        long rounded = Math.round(scaled);
-        return rounded / factor;
-    }
-
-    /**
-     * Generic helper to write a list of data using the appropriate encoder method.
-     */
-    private <T extends Number> void writeData(List<T> data, Encoder encoder, ByteArrayOutputStream out) throws IOException {
-        if (data.isEmpty()) {
-            return;
-        }
-        // Use instanceof to call the correct overloaded encode method
-        if (data.get(0) instanceof Integer) {
-            data.forEach(val -> encoder.encode((Integer) val, out));
-        } else if (data.get(0) instanceof Long) {
-            data.forEach(val -> encoder.encode((Long) val, out));
-        } else if (data.get(0) instanceof Float) {
-            data.forEach(val -> encoder.encode((Float) val, out));
-        } else if (data.get(0) instanceof Double) {
-            data.forEach(val -> encoder.encode((Double) val, out));
-        }
-        encoder.flush(out);
-    }
-
-    /**
-     * The main validation method. It encodes the given data, then decodes it,
-     * and finally compares the frequency maps of the original and decoded data.
-     */
-    private <T extends Number> void shouldReadAndWrite(List<T> originalData, TSDataType dataType) throws IOException {
-        // 1. Prepare for encoding
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
-        Encoder encoder = new AClusterEncoder(dataType);
-
-        // 2. Encode the data
-        writeData(originalData, encoder, out);
-        ByteBuffer buffer = ByteBuffer.wrap(out.toByteArray());
-
-        // 3. Decode the data using the iterator-style interface
-        Decoder decoder = new ClusterDecoder(dataType);
-        List<T> decodedData = new ArrayList<>();
-
-        while (decoder.hasNext(buffer)) {
-            switch (dataType) {
-                case INT32:
-                    decodedData.add((T) Integer.valueOf(decoder.readInt(buffer)));
-                    break;
-                case INT64:
-                    decodedData.add((T) Long.valueOf(decoder.readLong(buffer)));
-                    break;
-                case FLOAT:
-                    decodedData.add((T) Float.valueOf(decoder.readFloat(buffer)));
-                    break;
-                case DOUBLE:
-                    decodedData.add((T) Double.valueOf(decoder.readDouble(buffer)));
-                    break;
-                default:
-                    throw new UnsupportedOperationException("Unsupported data type for test");
-            }
-        }
-
-        // 4. Validate the results
-        // First, a quick check on the total count
-        assertEquals("Decoded data size should match original data size", originalData.size(), decodedData.size());
-
-        // Second, the robust check using frequency maps
-        Map<T, Long> originalFrequencies = getFrequencyMap(originalData);
-        Map<T, Long> decodedFrequencies = getFrequencyMap(decodedData);
-
-        assertEquals("Frequency maps of original and decoded data should be identical", originalFrequencies, decodedFrequencies);
-    }
-
-    /**
-     * Helper method to count frequencies of elements in a list.
-     */
-    private <T extends Number> Map<T, Long> getFrequencyMap(List<T> list) {
-        return list.stream()
-                .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
-    }
-}
\ No newline at end of file
+  /** Helper method to count frequencies of elements in a list. */
+  private <T extends Number> Map<T, Long> getFrequencyMap(List<T> list) {
+    return list.stream().collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
+  }
+}
diff --git a/pom.xml b/pom.xml
index 1ee0d80..9aa7a33 100644
--- a/pom.xml
+++ b/pom.xml
@@ -323,6 +323,16 @@
                                     <excludeCoordinates>
                                         <!-- TODO: For this CVE no fix exists yet (Keep an eye on it) -->
                                         <exclude>
+                                            <groupId>org.apache.commons</groupId>
+                                            <artifactId>commons-lang3</artifactId>
+                                            <version>3.15.0</version>
+                                        </exclude>
+                                        <exclude>
+                                            <groupId>com.google.code.gson</groupId>
+                                            <artifactId>gson</artifactId>
+                                            <version>2.10.1</version>
+                                        </exclude>
+                                        <exclude>
                                             <groupId>io.netty</groupId>
                                             <artifactId>netty-handler</artifactId>
                                             <version>4.1.97.Final</version>