Batched decode.
diff --git a/example/decode_benchmark.cc b/example/decode_benchmark.cc
index c386444..beb6577 100644
--- a/example/decode_benchmark.cc
+++ b/example/decode_benchmark.cc
@@ -18,7 +18,11 @@
#include "example_util.h"
#include "compression/codec.h"
+#include "encodings/delta-bit-pack-encoding.h"
+#include "encodings/delta-byte-array-encoding.h"
+#include "encodings/delta-length-byte-array-encoding.h"
#include "encodings/encodings.h"
+#include "encodings/plain-encoding.h"
#include "util/stopwatch.h"
using namespace impala;
@@ -228,7 +232,7 @@
encoder.Add(values[i]);
}
- int raw_len = encoder.num_values() * sizeof(int);
+ int raw_len = encoder.num_values() * sizeof(int64_t);
int len;
uint8_t* buffer = encoder.Encode(&len);
@@ -237,13 +241,17 @@
printf(" Raw len: %d\n", raw_len);
printf(" Encoded len: %d (%0.2f%%)\n", len, len * 100 / (float)raw_len);
decoder.SetData(encoder.num_values(), buffer, len);
- for (int i = 0; i < encoder.num_values(); ++i) {
- int64_t x = 0;
- decoder.GetInt64(&x, 1);
- if (values[i] != x) {
- cerr << "Bad: " << i << endl;
- cerr << " " << x << " != " << values[i] << endl;
- break;
+ int64_t v;
+ decoder.GetInt64(&v, 1);
+
+ for (int i = 1; i < encoder.num_values(); i += benchmark_batch_size) {
+ int64_t x[benchmark_batch_size];
+ int n = decoder.GetInt64(x, benchmark_batch_size);
+ for (int j = 0; j < n; ++j) {
+ if (values[i + j] != x[j]) {
+ cerr << "Bad: " << i + j << endl;
+ cerr << " " << x[j] << " != " << values[i + j] << endl;
+ }
}
}
return 0;
@@ -417,9 +425,12 @@
}
int main(int argc, char** argv) {
+ DeltaBitPackDecoder::Init();
+ /*
TestBinaryPacking();
TestDeltaLengthByteArray();
TestDeltaByteArray();
+ */
StopWatch sw;
uint64_t elapsed = 0;
@@ -441,7 +452,7 @@
for (int i = 0; i < 1000000; ++i) {
values.push_back(rand() % 10000);
}
- //TestBinaryPackedEncoding("Rand 0-10K", values, 1, 1);
+ //TestBinaryPackedEncoding("Rand 0-10K", values, -1, 32);
//return 0;
TestBinaryPackedEncoding("Rand 0-10K", values, 100, 1);
TestBinaryPackedEncoding("Rand 0-10K", values, 100, 16);
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 82725d7..c3d5727 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -14,6 +14,7 @@
add_library(Parquet STATIC
parquet.cc
+ encodings/delta-bit-pack-encoding.cc
)
add_subdirectory(compression)
diff --git a/src/encodings/delta-bit-pack-encoding.cc b/src/encodings/delta-bit-pack-encoding.cc
new file mode 100644
index 0000000..646558d
--- /dev/null
+++ b/src/encodings/delta-bit-pack-encoding.cc
@@ -0,0 +1,21 @@
+// Copyright 2012 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "encodings/delta-bit-pack-encoding.h"
+
+using namespace parquet_cpp;
+
+DeltaBitPackDecoder::DecodeData DeltaBitPackDecoder::decode_data_14_[32];
+DeltaBitPackDecoder::DecodeData DeltaBitPackDecoder::decode_data_15_[32];
+
diff --git a/src/encodings/delta-bit-pack-encoding.h b/src/encodings/delta-bit-pack-encoding.h
index 3197a86..44f5a24 100644
--- a/src/encodings/delta-bit-pack-encoding.h
+++ b/src/encodings/delta-bit-pack-encoding.h
@@ -47,6 +47,11 @@
return GetInternal(buffer, max_values);
}
+ static void Init() {
+ Init(decode_data_14_, 14);
+ Init(decode_data_15_, 15);
+ }
+
private:
void InitBlock() {
uint64_t block_size;
@@ -86,7 +91,26 @@
}
}
- // TODO: the key to this algorithm is to decode the entire miniblock at once.
+ if (max_values == 32) {
+ if (delta_bit_width_ == 14) {
+ const uint8_t* data = decoder_.current_ptr();
+ DecodeBlock<T>(decode_data_14_, data, buffer);
+ } else if (delta_bit_width_ == 15) {
+ const uint8_t* data = decoder_.current_ptr();
+ DecodeBlock<T>(decode_data_15_, data, buffer);
+ } else {
+ // TODO: the key to this algorithm is to decode the entire miniblock at once.
+ ParquetException::NYI("miniblocks");
+ }
+ for (int i = 0; i < 32; ++i) {
+ buffer[i] = last_value_ + min_delta_ + buffer[i];
+ last_value_ = buffer[i];
+ }
+ decoder_.SkipBytes(32 * delta_bit_width_ / 8);
+ values_current_mini_block_ = 0;
+ num_values_ -= 32;
+ return 32;
+ }
int64_t delta;
if (!decoder_.GetValue(delta_bit_width_, &delta)) ParquetException::EofException();
delta += min_delta_;
@@ -110,6 +134,45 @@
int delta_bit_width_;
int64_t last_value_;
+
+ struct DecodeData {
+ int byte_offset;
+ int shift;
+ };
+
+ static DecodeData decode_data_14_[32];
+ static DecodeData decode_data_15_[32];
+
+ template <typename T>
+ void DecodeBlock(const DecodeData* offsets, const uint8_t* data, T* buffer) {
+ uint64_t mask = -1;
+ mask = mask >> (64 - delta_bit_width_);
+ for (int i = 0; i < 32; i += 4) {
+ buffer[i + 0] = *reinterpret_cast<const T*>(data + offsets[i + 0].byte_offset);
+ buffer[i + 1] = *reinterpret_cast<const T*>(data + offsets[i + 1].byte_offset);
+ buffer[i + 2] = *reinterpret_cast<const T*>(data + offsets[i + 2].byte_offset);
+ buffer[i + 3] = *reinterpret_cast<const T*>(data + offsets[i + 3].byte_offset);
+
+ buffer[i + 0] >>= offsets[i + 0].shift;
+ buffer[i + 1] >>= offsets[i + 1].shift;
+ buffer[i + 2] >>= offsets[i + 2].shift;
+ buffer[i + 3] >>= offsets[i + 3].shift;
+
+ buffer[i + 0] &= mask;
+ buffer[i + 1] &= mask;
+ buffer[i + 2] &= mask;
+ buffer[i + 3] &= mask;
+ }
+ }
+
+ static void Init(DecodeData* data, int bit_width) {
+ int bit_offset = 0;
+ for (int i = 0; i < 32; ++i) {
+ data[i].byte_offset = bit_offset / 8;
+ data[i].shift = bit_offset % 8;
+ bit_offset += bit_width;
+ }
+ }
};
}
diff --git a/src/encodings/delta-byte-array-encoding.h b/src/encodings/delta-byte-array-encoding.h
index cdbbfde..b629c15 100644
--- a/src/encodings/delta-byte-array-encoding.h
+++ b/src/encodings/delta-byte-array-encoding.h
@@ -15,7 +15,8 @@
#ifndef PARQUET_DELTA_BYTE_ARRAY_ENCODING_H
#define PARQUET_DELTA_BYTE_ARRAY_ENCODING_H
-#include "encodings.h"
+#include "encodings/delta-bit-pack-encoding.h"
+#include "encodings/delta-length-byte-array-encoding.h"
namespace parquet_cpp {
diff --git a/src/encodings/encodings.h b/src/encodings/encodings.h
index e888c1f..e0b162a 100644
--- a/src/encodings/encodings.h
+++ b/src/encodings/encodings.h
@@ -22,6 +22,8 @@
#include "impala/rle-encoding.h"
#include "impala/bit-stream-utils.inline.h"
+#include "parquet/parquet.h"
+
namespace parquet_cpp {
class Decoder {
@@ -72,12 +74,5 @@
}
-#include "bool-encoding.h"
-#include "plain-encoding.h"
-#include "dictionary-encoding.h"
-#include "delta-bit-pack-encoding.h"
-#include "delta-length-byte-array-encoding.h"
-#include "delta-byte-array-encoding.h"
-
#endif
diff --git a/src/impala/bit-stream-utils.h b/src/impala/bit-stream-utils.h
index 5eba254..e10b5b9 100644
--- a/src/impala/bit-stream-utils.h
+++ b/src/impala/bit-stream-utils.h
@@ -125,6 +125,13 @@
// there may be an additional fraction of a byte).
int bytes_left() { return max_bytes_ - (byte_offset_ + BitUtil::Ceil(bit_offset_, 8)); }
+ const uint8_t* current_ptr() { return buffer_ + byte_offset_; }
+ void SkipBytes(int num_bytes) {
+ byte_offset_ += num_bytes;
+ num_bytes = std::min(8, max_bytes_ - byte_offset_);
+ memcpy(&buffered_values_, buffer_ + byte_offset_, num_bytes);
+ }
+
// Maximum byte length of a vlq encoded int
static const int MAX_VLQ_BYTE_LEN = 5;
diff --git a/src/parquet.cc b/src/parquet.cc
index 67d9caf..0404e66 100644
--- a/src/parquet.cc
+++ b/src/parquet.cc
@@ -13,8 +13,11 @@
// limitations under the License.
#include "parquet/parquet.h"
-#include "encodings/encodings.h"
#include "compression/codec.h"
+#include "encodings/bool-encoding.h"
+#include "encodings/encodings.h"
+#include "encodings/dictionary-encoding.h"
+#include "encodings/plain-encoding.h"
#include <string>
#include <string.h>