| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <stdio.h> |
| #include <iostream> |
| #include <random> |
| |
| #include "arrow/test-util.h" |
| #include "arrow/util/compression.h" |
| #include "arrow/util/compression_snappy.h" |
| |
| #include "parquet/encoding-internal.h" |
| #include "parquet/util/logging.h" |
| #include "parquet/util/stopwatch.h" |
| |
| /** |
| * Test bed for encodings and some utilities to measure their throughput. |
| * TODO: this file needs some major cleanup. |
| */ |
| |
| class DeltaBitPackEncoder { |
| public: |
| explicit DeltaBitPackEncoder(int mini_block_size = 8) { |
| mini_block_size_ = mini_block_size; |
| } |
| |
| void Add(int64_t v) { values_.push_back(v); } |
| |
| uint8_t* Encode(int* encoded_len) { |
| uint8_t* result = new uint8_t[10 * 1024 * 1024]; |
| int num_mini_blocks = static_cast<int>(arrow::BitUtil::CeilDiv(num_values() - 1, |
| mini_block_size_)); |
| uint8_t* mini_block_widths = NULL; |
| |
| arrow::BitWriter writer(result, 10 * 1024 * 1024); |
| |
| // Writer the size of each block. We only use 1 block currently. |
| writer.PutVlqInt(static_cast<uint32_t>(num_mini_blocks * mini_block_size_)); |
| |
| // Write the number of mini blocks. |
| writer.PutVlqInt(static_cast<uint32_t>(num_mini_blocks)); |
| |
| // Write the number of values. |
| writer.PutVlqInt(num_values() - 1); |
| |
| // Write the first value. |
| writer.PutZigZagVlqInt(static_cast<uint32_t>(values_[0])); |
| |
| // Compute the values as deltas and the min delta. |
| int64_t min_delta = std::numeric_limits<int64_t>::max(); |
| for (size_t i = values_.size() - 1; i > 0; --i) { |
| values_[i] -= values_[i - 1]; |
| min_delta = std::min(min_delta, values_[i]); |
| } |
| |
| // Write out the min delta. |
| writer.PutZigZagVlqInt(static_cast<int32_t>(min_delta)); |
| |
| // We need to save num_mini_blocks bytes to store the bit widths of the mini |
| // blocks. |
| mini_block_widths = writer.GetNextBytePtr(num_mini_blocks); |
| |
| int idx = 1; |
| for (int i = 0; i < num_mini_blocks; ++i) { |
| int n = std::min(mini_block_size_, num_values() - idx); |
| |
| // Compute the max delta in this mini block. |
| int64_t max_delta = std::numeric_limits<int64_t>::min(); |
| for (int j = 0; j < n; ++j) { |
| max_delta = std::max(values_[idx + j], max_delta); |
| } |
| |
| // The bit width for this block is the number of bits needed to store |
| // (max_delta - min_delta). |
| int bit_width = arrow::BitUtil::NumRequiredBits(max_delta - min_delta); |
| mini_block_widths[i] = static_cast<uint8_t>(bit_width); |
| |
| // Encode this mini blocking using min_delta and bit_width |
| for (int j = 0; j < n; ++j) { |
| writer.PutValue(values_[idx + j] - min_delta, bit_width); |
| } |
| |
| // Pad out the last block. |
| for (int j = n; j < mini_block_size_; ++j) { |
| writer.PutValue(0, bit_width); |
| } |
| idx += n; |
| } |
| |
| writer.Flush(); |
| *encoded_len = writer.bytes_written(); |
| return result; |
| } |
| |
| int num_values() const { return static_cast<int>(values_.size()); } |
| |
| private: |
| int mini_block_size_; |
| std::vector<int64_t> values_; |
| }; |
| |
| class DeltaLengthByteArrayEncoder { |
| public: |
| explicit DeltaLengthByteArrayEncoder(int mini_block_size = 8) |
| : len_encoder_(mini_block_size), |
| buffer_(new uint8_t[10 * 1024 * 1024]), |
| offset_(0), |
| plain_encoded_len_(0) {} |
| |
| void Add(const std::string& s) { |
| Add(reinterpret_cast<const uint8_t*>(s.data()), static_cast<int>(s.size())); |
| } |
| |
| void Add(const uint8_t* ptr, int len) { |
| plain_encoded_len_ += static_cast<int>(len + sizeof(int)); |
| len_encoder_.Add(len); |
| memcpy(buffer_ + offset_, ptr, len); |
| offset_ += len; |
| } |
| |
| uint8_t* Encode(int* encoded_len) { |
| uint8_t* encoded_lengths = len_encoder_.Encode(encoded_len); |
| memmove(buffer_ + *encoded_len + sizeof(int), buffer_, offset_); |
| memcpy(buffer_, encoded_len, sizeof(int)); |
| memcpy(buffer_ + sizeof(int), encoded_lengths, *encoded_len); |
| *encoded_len += static_cast<int>(offset_ + sizeof(int)); |
| return buffer_; |
| } |
| |
| int num_values() const { return len_encoder_.num_values(); } |
| int plain_encoded_len() const { return plain_encoded_len_; } |
| |
| private: |
| DeltaBitPackEncoder len_encoder_; |
| uint8_t* buffer_; |
| int offset_; |
| int plain_encoded_len_; |
| }; |
| |
| class DeltaByteArrayEncoder { |
| public: |
| DeltaByteArrayEncoder() : plain_encoded_len_(0) {} |
| |
| void Add(const std::string& s) { |
| plain_encoded_len_ += static_cast<int>(s.size() + sizeof(int)); |
| int min_len = static_cast<int>(std::min(s.size(), last_value_.size())); |
| int prefix_len = 0; |
| for (int i = 0; i < min_len; ++i) { |
| if (s[i] == last_value_[i]) { |
| ++prefix_len; |
| } else { |
| break; |
| } |
| } |
| prefix_len_encoder_.Add(prefix_len); |
| suffix_encoder_.Add(reinterpret_cast<const uint8_t*>(s.data()) + prefix_len, |
| static_cast<int>(s.size() - prefix_len)); |
| last_value_ = s; |
| } |
| |
| uint8_t* Encode(int* encoded_len) { |
| int prefix_buffer_len; |
| uint8_t* prefix_buffer = prefix_len_encoder_.Encode(&prefix_buffer_len); |
| int suffix_buffer_len; |
| uint8_t* suffix_buffer = suffix_encoder_.Encode(&suffix_buffer_len); |
| |
| uint8_t* buffer = new uint8_t[10 * 1024 * 1024]; |
| memcpy(buffer, &prefix_buffer_len, sizeof(int)); |
| memcpy(buffer + sizeof(int), prefix_buffer, prefix_buffer_len); |
| memcpy(buffer + sizeof(int) + prefix_buffer_len, suffix_buffer, suffix_buffer_len); |
| *encoded_len = static_cast<int>(sizeof(int) + prefix_buffer_len + suffix_buffer_len); |
| return buffer; |
| } |
| |
| int num_values() const { return prefix_len_encoder_.num_values(); } |
| int plain_encoded_len() const { return plain_encoded_len_; } |
| |
| private: |
| DeltaBitPackEncoder prefix_len_encoder_; |
| DeltaLengthByteArrayEncoder suffix_encoder_; |
| std::string last_value_; |
| int plain_encoded_len_; |
| }; |
| |
| uint64_t TestPlainIntEncoding(const uint8_t* data, int num_values, int batch_size) { |
| uint64_t result = 0; |
| parquet::PlainDecoder<parquet::Int64Type> decoder(nullptr); |
| decoder.SetData(num_values, data, static_cast<int>(num_values * sizeof(int64_t))); |
| std::vector<int64_t> values(batch_size); |
| for (int i = 0; i < num_values;) { |
| int n = decoder.Decode(values.data(), batch_size); |
| for (int j = 0; j < n; ++j) { |
| result += values[j]; |
| } |
| i += n; |
| } |
| return result; |
| } |
| |
| uint64_t TestBinaryPackedEncoding(const char* name, const std::vector<int64_t>& values, |
| int benchmark_iters = -1, |
| int benchmark_batch_size = 1) { |
| int mini_block_size; |
| if (values.size() < 8) { |
| mini_block_size = 8; |
| } else if (values.size() < 16) { |
| mini_block_size = 16; |
| } else { |
| mini_block_size = 32; |
| } |
| parquet::DeltaBitPackDecoder<parquet::Int64Type> decoder(nullptr); |
| DeltaBitPackEncoder encoder(mini_block_size); |
| for (size_t i = 0; i < values.size(); ++i) { |
| encoder.Add(values[i]); |
| } |
| |
| int raw_len = static_cast<int>(encoder.num_values() * sizeof(int)); |
| int len; |
| uint8_t* buffer = encoder.Encode(&len); |
| |
| if (benchmark_iters == -1) { |
| printf("%s\n", name); |
| printf(" Raw len: %d\n", raw_len); |
| printf(" Encoded len: %d (%0.2f%%)\n", len, |
| static_cast<float>(len) * 100.0f / static_cast<float>(raw_len)); |
| decoder.SetData(encoder.num_values(), buffer, len); |
| for (int i = 0; i < encoder.num_values(); ++i) { |
| int64_t x = 0; |
| decoder.Decode(&x, 1); |
| if (values[i] != x) { |
| std::cerr << "Bad: " << i << std::endl; |
| std::cerr << " " << x << " != " << values[i] << std::endl; |
| break; |
| } |
| } |
| return 0; |
| } else { |
| printf("%s\n", name); |
| printf(" Raw len: %d\n", raw_len); |
| printf(" Encoded len: %d (%0.2f%%)\n", len, |
| static_cast<float>(len) * 100.0f / static_cast<float>(raw_len)); |
| |
| uint64_t result = 0; |
| std::vector<int64_t> buf(benchmark_batch_size); |
| parquet::StopWatch sw; |
| sw.Start(); |
| for (int k = 0; k < benchmark_iters; ++k) { |
| decoder.SetData(encoder.num_values(), buffer, len); |
| for (size_t i = 0; i < values.size();) { |
| int n = decoder.Decode(buf.data(), benchmark_batch_size); |
| for (int j = 0; j < n; ++j) { |
| result += buf[j]; |
| } |
| i += n; |
| } |
| } |
| uint64_t elapsed = sw.Stop(); |
| double num_ints = static_cast<double>(values.size() * benchmark_iters) * 1000.; |
| printf("%s rate (batch size = %2d): %0.3fM per second.\n", name, benchmark_batch_size, |
| num_ints / static_cast<double>(elapsed)); |
| return result; |
| } |
| } |
| |
| #define DECODE_TEST(NAME, FN, DATA, BATCH_SIZE) \ |
| sw.Start(); \ |
| for (int i = 0; i < NUM_ITERS; ++i) { \ |
| FN(reinterpret_cast<uint8_t*>(&DATA[0]), NUM_VALUES, BATCH_SIZE); \ |
| } \ |
| elapsed = sw.Stop(); \ |
| printf("%s rate (batch size = %2d): %0.3fM per second.\n", NAME, BATCH_SIZE, \ |
| mult / static_cast<double>(elapsed)); |
| |
| void TestPlainIntCompressed(::arrow::Codec* codec, const std::vector<int64_t>& data, |
| int num_iters, int batch_size) { |
| const uint8_t* raw_data = reinterpret_cast<const uint8_t*>(&data[0]); |
| int uncompressed_len = static_cast<int>(data.size() * sizeof(int64_t)); |
| uint8_t* decompressed_data = new uint8_t[uncompressed_len]; |
| |
| int64_t max_compressed_size = codec->MaxCompressedLen(uncompressed_len, raw_data); |
| uint8_t* compressed_data = new uint8_t[max_compressed_size]; |
| int64_t compressed_len; |
| DCHECK(codec |
| ->Compress(uncompressed_len, raw_data, max_compressed_size, compressed_data, |
| &compressed_len) |
| .ok()); |
| |
| printf("\n%s:\n Uncompressed len: %d\n Compressed len: %d\n", codec->name(), |
| uncompressed_len, static_cast<int>(compressed_len)); |
| |
| double mult = static_cast<double>(num_iters * data.size()) * 1000.; |
| parquet::StopWatch sw; |
| sw.Start(); |
| uint64_t r = 0; |
| for (int i = 0; i < num_iters; ++i) { |
| ABORT_NOT_OK(codec->Decompress(compressed_len, compressed_data, uncompressed_len, |
| decompressed_data)); |
| r += TestPlainIntEncoding(decompressed_data, static_cast<int>(data.size()), |
| batch_size); |
| } |
| int64_t elapsed = sw.Stop(); |
| printf("Compressed(%s) plain int rate (batch size = %2d): %0.3fM per second.\n", |
| codec->name(), batch_size, mult / static_cast<double>(elapsed)); |
| |
| delete[] compressed_data; |
| delete[] decompressed_data; |
| } |
| |
| void TestBinaryPacking() { |
| std::vector<int64_t> values; |
| values.clear(); |
| for (int i = 0; i < 100; ++i) values.push_back(0); |
| TestBinaryPackedEncoding("Zeros", values); |
| |
| values.clear(); |
| for (int i = 1; i <= 5; ++i) values.push_back(i); |
| TestBinaryPackedEncoding("Example 1", values); |
| |
| values.clear(); |
| values.push_back(7); |
| values.push_back(5); |
| values.push_back(3); |
| values.push_back(1); |
| values.push_back(2); |
| values.push_back(3); |
| values.push_back(4); |
| values.push_back(5); |
| TestBinaryPackedEncoding("Example 2", values); |
| |
| // Test rand ints between 0 and 10K |
| values.clear(); |
| int seed = 0; |
| std::mt19937 gen(seed); |
| std::uniform_int_distribution<int> d(0, 10000); |
| for (int i = 0; i < 500000; ++i) { |
| values.push_back(d(gen)); |
| } |
| TestBinaryPackedEncoding("Rand [0, 10000)", values); |
| |
| // Test rand ints between 0 and 100 |
| values.clear(); |
| std::uniform_int_distribution<int> d1(0, 100); |
| for (int i = 0; i < 500000; ++i) { |
| values.push_back(d1(gen)); |
| } |
| TestBinaryPackedEncoding("Rand [0, 100)", values); |
| } |
| |
| void TestDeltaLengthByteArray() { |
| parquet::DeltaLengthByteArrayDecoder decoder(nullptr); |
| DeltaLengthByteArrayEncoder encoder; |
| |
| std::vector<std::string> values; |
| values.push_back("Hello"); |
| values.push_back("World"); |
| values.push_back("Foobar"); |
| values.push_back("ABCDEF"); |
| |
| for (size_t i = 0; i < values.size(); ++i) { |
| encoder.Add(values[i]); |
| } |
| |
| int len = 0; |
| uint8_t* buffer = encoder.Encode(&len); |
| printf("DeltaLengthByteArray\n Raw len: %d\n Encoded len: %d\n", |
| encoder.plain_encoded_len(), len); |
| decoder.SetData(encoder.num_values(), buffer, len); |
| for (int i = 0; i < encoder.num_values(); ++i) { |
| parquet::ByteArray v = {0, NULL}; |
| decoder.Decode(&v, 1); |
| std::string r = std::string(reinterpret_cast<const char*>(v.ptr), v.len); |
| if (r != values[i]) { |
| std::cout << "Bad " << r << " != " << values[i] << std::endl; |
| } |
| } |
| } |
| |
| void TestDeltaByteArray() { |
| parquet::DeltaByteArrayDecoder decoder(nullptr); |
| DeltaByteArrayEncoder encoder; |
| |
| std::vector<std::string> values; |
| |
| // Wikipedia example |
| values.push_back("myxa"); |
| values.push_back("myxophyta"); |
| values.push_back("myxopod"); |
| values.push_back("nab"); |
| values.push_back("nabbed"); |
| values.push_back("nabbing"); |
| values.push_back("nabit"); |
| values.push_back("nabk"); |
| values.push_back("nabob"); |
| values.push_back("nacarat"); |
| values.push_back("nacelle"); |
| |
| for (size_t i = 0; i < values.size(); ++i) { |
| encoder.Add(values[i]); |
| } |
| |
| int len = 0; |
| uint8_t* buffer = encoder.Encode(&len); |
| printf("DeltaLengthByteArray\n Raw len: %d\n Encoded len: %d\n", |
| encoder.plain_encoded_len(), len); |
| decoder.SetData(encoder.num_values(), buffer, len); |
| for (int i = 0; i < encoder.num_values(); ++i) { |
| parquet::ByteArray v; |
| decoder.Decode(&v, 1); |
| std::string r = std::string(reinterpret_cast<const char*>(v.ptr), v.len); |
| if (r != values[i]) { |
| std::cout << "Bad " << r << " != " << values[i] << std::endl; |
| } |
| } |
| } |
| |
| int main(int argc, char** argv) { |
| TestBinaryPacking(); |
| TestDeltaLengthByteArray(); |
| TestDeltaByteArray(); |
| |
| parquet::StopWatch sw; |
| uint64_t elapsed = 0; |
| |
| const int NUM_VALUES = 1024 * 1024; |
| const int NUM_ITERS = 10; |
| const double mult = NUM_VALUES * NUM_ITERS * 1000.; |
| |
| std::vector<int64_t> plain_int_data; |
| plain_int_data.resize(NUM_VALUES); |
| |
| DECODE_TEST("Plain decoder", TestPlainIntEncoding, plain_int_data, 1); |
| DECODE_TEST("Plain decoder", TestPlainIntEncoding, plain_int_data, 16); |
| DECODE_TEST("Plain decoder", TestPlainIntEncoding, plain_int_data, 32); |
| DECODE_TEST("Plain decoder", TestPlainIntEncoding, plain_int_data, 64); |
| |
| // Test rand ints between 0 and 10K |
| std::vector<int64_t> values; |
| int seed = 0; |
| std::mt19937 gen(seed); |
| std::uniform_int_distribution<int> d(0, 10000); |
| for (int i = 0; i < 1000000; ++i) { |
| values.push_back(d(gen)); |
| } |
| TestBinaryPackedEncoding("Rand 0-10K", values, 100, 1); |
| TestBinaryPackedEncoding("Rand 0-10K", values, 100, 16); |
| TestBinaryPackedEncoding("Rand 0-10K", values, 100, 32); |
| TestBinaryPackedEncoding("Rand 0-10K", values, 100, 64); |
| |
| ::arrow::SnappyCodec snappy_codec; |
| |
| TestPlainIntCompressed(&snappy_codec, values, 100, 1); |
| TestPlainIntCompressed(&snappy_codec, values, 100, 16); |
| TestPlainIntCompressed(&snappy_codec, values, 100, 32); |
| TestPlainIntCompressed(&snappy_codec, values, 100, 64); |
| |
| return 0; |
| } |