Batched decode.
diff --git a/example/decode_benchmark.cc b/example/decode_benchmark.cc index c386444..beb6577 100644 --- a/example/decode_benchmark.cc +++ b/example/decode_benchmark.cc
@@ -18,7 +18,11 @@ #include "example_util.h" #include "compression/codec.h" +#include "encodings/delta-bit-pack-encoding.h" +#include "encodings/delta-byte-array-encoding.h" +#include "encodings/delta-length-byte-array-encoding.h" #include "encodings/encodings.h" +#include "encodings/plain-encoding.h" #include "util/stopwatch.h" using namespace impala; @@ -228,7 +232,7 @@ encoder.Add(values[i]); } - int raw_len = encoder.num_values() * sizeof(int); + int raw_len = encoder.num_values() * sizeof(int64_t); int len; uint8_t* buffer = encoder.Encode(&len); @@ -237,13 +241,17 @@ printf(" Raw len: %d\n", raw_len); printf(" Encoded len: %d (%0.2f%%)\n", len, len * 100 / (float)raw_len); decoder.SetData(encoder.num_values(), buffer, len); - for (int i = 0; i < encoder.num_values(); ++i) { - int64_t x = 0; - decoder.GetInt64(&x, 1); - if (values[i] != x) { - cerr << "Bad: " << i << endl; - cerr << " " << x << " != " << values[i] << endl; - break; + int64_t v; + decoder.GetInt64(&v, 1); + + for (int i = 1; i < encoder.num_values(); i += benchmark_batch_size) { + int64_t x[benchmark_batch_size]; + int n = decoder.GetInt64(x, benchmark_batch_size); + for (int j = 0; j < n; ++j) { + if (values[i + j] != x[j]) { + cerr << "Bad: " << i + j << endl; + cerr << " " << x[j] << " != " << values[i + j] << endl; + } } } return 0; @@ -417,9 +425,12 @@ } int main(int argc, char** argv) { + DeltaBitPackDecoder::Init(); + /* TestBinaryPacking(); TestDeltaLengthByteArray(); TestDeltaByteArray(); + */ StopWatch sw; uint64_t elapsed = 0; @@ -441,7 +452,7 @@ for (int i = 0; i < 1000000; ++i) { values.push_back(rand() % 10000); } - //TestBinaryPackedEncoding("Rand 0-10K", values, 1, 1); + //TestBinaryPackedEncoding("Rand 0-10K", values, -1, 32); //return 0; TestBinaryPackedEncoding("Rand 0-10K", values, 100, 1); TestBinaryPackedEncoding("Rand 0-10K", values, 100, 16);
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 82725d7..c3d5727 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt
@@ -14,6 +14,7 @@ add_library(Parquet STATIC parquet.cc + encodings/delta-bit-pack-encoding.cc ) add_subdirectory(compression)
diff --git a/src/encodings/delta-bit-pack-encoding.cc b/src/encodings/delta-bit-pack-encoding.cc new file mode 100644 index 0000000..646558d --- /dev/null +++ b/src/encodings/delta-bit-pack-encoding.cc
@@ -0,0 +1,21 @@ +// Copyright 2012 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "encodings/delta-bit-pack-encoding.h" + +using namespace parquet_cpp; + +DeltaBitPackDecoder::DecodeData DeltaBitPackDecoder::decode_data_14_[32]; +DeltaBitPackDecoder::DecodeData DeltaBitPackDecoder::decode_data_15_[32]; +
diff --git a/src/encodings/delta-bit-pack-encoding.h b/src/encodings/delta-bit-pack-encoding.h index 3197a86..44f5a24 100644 --- a/src/encodings/delta-bit-pack-encoding.h +++ b/src/encodings/delta-bit-pack-encoding.h
@@ -47,6 +47,11 @@ return GetInternal(buffer, max_values); } + static void Init() { + Init(decode_data_14_, 14); + Init(decode_data_15_, 15); + } + private: void InitBlock() { uint64_t block_size; @@ -86,7 +91,26 @@ } } - // TODO: the key to this algorithm is to decode the entire miniblock at once. + if (max_values == 32) { + if (delta_bit_width_ == 14) { + const uint8_t* data = decoder_.current_ptr(); + DecodeBlock<T>(decode_data_14_, data, buffer); + } else if (delta_bit_width_ == 15) { + const uint8_t* data = decoder_.current_ptr(); + DecodeBlock<T>(decode_data_15_, data, buffer); + } else { + // TODO: the key to this algorithm is to decode the entire miniblock at once. + ParquetException::NYI("miniblocks"); + } + for (int i = 0; i < 32; ++i) { + buffer[i] = last_value_ + min_delta_ + buffer[i]; + last_value_ = buffer[i]; + } + decoder_.SkipBytes(32 * delta_bit_width_ / 8); + values_current_mini_block_ = 0; + num_values_ -= 32; + return 32; + } int64_t delta; if (!decoder_.GetValue(delta_bit_width_, &delta)) ParquetException::EofException(); delta += min_delta_; @@ -110,6 +134,45 @@ int delta_bit_width_; int64_t last_value_; + + struct DecodeData { + int byte_offset; + int shift; + }; + + static DecodeData decode_data_14_[32]; + static DecodeData decode_data_15_[32]; + + template <typename T> + void DecodeBlock(const DecodeData* offsets, const uint8_t* data, T* buffer) { + uint64_t mask = -1; + mask = mask >> (64 - delta_bit_width_); + for (int i = 0; i < 32; i += 4) { + buffer[i + 0] = *reinterpret_cast<const T*>(data + offsets[i + 0].byte_offset); + buffer[i + 1] = *reinterpret_cast<const T*>(data + offsets[i + 1].byte_offset); + buffer[i + 2] = *reinterpret_cast<const T*>(data + offsets[i + 2].byte_offset); + buffer[i + 3] = *reinterpret_cast<const T*>(data + offsets[i + 3].byte_offset); + + buffer[i + 0] >>= offsets[i + 0].shift; + buffer[i + 1] >>= offsets[i + 1].shift; + buffer[i + 2] >>= offsets[i + 2].shift; + buffer[i + 3] >>= offsets[i + 3].shift; + + buffer[i + 0] &= mask; + buffer[i + 1] &= mask; + buffer[i + 2] &= mask; + buffer[i + 3] &= mask; + } + } + + static void Init(DecodeData* data, int bit_width) { + int bit_offset = 0; + for (int i = 0; i < 32; ++i) { + data[i].byte_offset = bit_offset / 8; + data[i].shift = bit_offset % 8; + bit_offset += bit_width; + } + } }; }
diff --git a/src/encodings/delta-byte-array-encoding.h b/src/encodings/delta-byte-array-encoding.h index cdbbfde..b629c15 100644 --- a/src/encodings/delta-byte-array-encoding.h +++ b/src/encodings/delta-byte-array-encoding.h
@@ -15,7 +15,8 @@ #ifndef PARQUET_DELTA_BYTE_ARRAY_ENCODING_H #define PARQUET_DELTA_BYTE_ARRAY_ENCODING_H -#include "encodings.h" +#include "encodings/delta-bit-pack-encoding.h" +#include "encodings/delta-length-byte-array-encoding.h" namespace parquet_cpp {
diff --git a/src/encodings/encodings.h b/src/encodings/encodings.h index e888c1f..e0b162a 100644 --- a/src/encodings/encodings.h +++ b/src/encodings/encodings.h
@@ -22,6 +22,8 @@ #include "impala/rle-encoding.h" #include "impala/bit-stream-utils.inline.h" +#include "parquet/parquet.h" + namespace parquet_cpp { class Decoder { @@ -72,12 +74,5 @@ } -#include "bool-encoding.h" -#include "plain-encoding.h" -#include "dictionary-encoding.h" -#include "delta-bit-pack-encoding.h" -#include "delta-length-byte-array-encoding.h" -#include "delta-byte-array-encoding.h" - #endif
diff --git a/src/impala/bit-stream-utils.h b/src/impala/bit-stream-utils.h index 5eba254..e10b5b9 100644 --- a/src/impala/bit-stream-utils.h +++ b/src/impala/bit-stream-utils.h
@@ -125,6 +125,13 @@ // there may be an additional fraction of a byte). int bytes_left() { return max_bytes_ - (byte_offset_ + BitUtil::Ceil(bit_offset_, 8)); } + const uint8_t* current_ptr() { return buffer_ + byte_offset_; } + void SkipBytes(int num_bytes) { + byte_offset_ += num_bytes; + num_bytes = std::min(8, max_bytes_ - byte_offset_); + memcpy(&buffered_values_, buffer_ + byte_offset_, num_bytes); + } + // Maximum byte length of a vlq encoded int static const int MAX_VLQ_BYTE_LEN = 5;
diff --git a/src/parquet.cc b/src/parquet.cc index 67d9caf..0404e66 100644 --- a/src/parquet.cc +++ b/src/parquet.cc
@@ -13,8 +13,11 @@ // limitations under the License. #include "parquet/parquet.h" -#include "encodings/encodings.h" #include "compression/codec.h" +#include "encodings/bool-encoding.h" +#include "encodings/encodings.h" +#include "encodings/dictionary-encoding.h" +#include "encodings/plain-encoding.h" #include <string> #include <string.h>