blob: 243064e214c559073e23248ad08d13537ae026e4 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_EXEC_PARQUET_COMMON_H
#define IMPALA_EXEC_PARQUET_COMMON_H
#include "common/compiler-util.h"
#include "gen-cpp/Descriptors_types.h"
#include "gen-cpp/parquet_types.h"
#include "runtime/date-value.h"
#include "runtime/decimal-value.h"
#include "runtime/string-value.h"
#include "runtime/timestamp-value.inline.h"
#include "util/bit-util.h"
#include "util/decimal-util.h"
#include "util/mem-util.h"
/// This file contains common elements between the parquet Writer and Scanner.
namespace impala {
const uint8_t PARQUET_VERSION_NUMBER[4] = {'P', 'A', 'R', '1'};
const uint32_t PARQUET_CURRENT_VERSION = 1;
/// Struct that specifies an inclusive range of rows.
struct RowRange {
int64_t first;
int64_t last;
};
inline bool operator==(const RowRange& lhs, const RowRange& rhs) {
return lhs.first == rhs.first && lhs.last == rhs.last;
}
inline bool operator<(const RowRange& lhs, const RowRange& rhs) {
return std::tie(lhs.first, lhs.last) < std::tie(rhs.first, rhs.last);
}
/// Return the Impala compression type for the given Parquet codec. The caller must
/// validate that the codec is a supported one, otherwise this will DCHECK.
THdfsCompression::type ConvertParquetToImpalaCodec(parquet::CompressionCodec::type codec);
/// Return the Parquet code for the given Impala compression type. The caller must
/// validate that the codec is a supported one, otherwise this will DCHECK.
parquet::CompressionCodec::type ConvertImpalaToParquetCodec(
THdfsCompression::type codec);
/// Returns the row range for the given page idx using information from the row group and
/// offset index.
void GetRowRangeForPage(const parquet::RowGroup& row_group,
const parquet::OffsetIndex& offset_index, int page_idx, RowRange* row_range);
/// Given a column chunk containing rows in the range [0, 'num_rows'), 'skip_ranges'
/// contains the row ranges we are not interested in. 'skip_ranges' can be redundant and
/// can potentially contain ranges that intersect with each other. As a side-effect, this
/// function sorts 'skip_ranges'.
/// 'candidate_ranges' will contain the set of row ranges that we want to scan.
/// Returns false if the input data is invalid.
bool ComputeCandidateRanges(const int64_t num_rows, std::vector<RowRange>* skip_ranges,
std::vector<RowRange>* candidate_ranges);
/// This function computes the pages that intersect with 'candidate_ranges'. I.e. it
/// determines the pages that we actually need to read from a given column chunk.
/// 'candidate_pages' will hold the indexes of such pages.
/// Returns true on success, false otherwise.
bool ComputeCandidatePages(
const std::vector<parquet::PageLocation>& page_locations,
const std::vector<RowRange>& candidate_ranges,
const int64_t num_rows, std::vector<int>* candidate_pages);
/// The plain encoding does not maintain any state so all these functions
/// are static helpers.
/// TODO: we are using templates to provide a generic interface (over the
/// types) to avoid performance penalties. This makes the code more complex
/// and should be removed when we have codegen support to inline virtual
/// calls.
class ParquetPlainEncoder {
public:
/// Returns the byte size of 'v' where InternalType is the datatype that Impala uses
/// internally to store tuple data. Used in some template function implementations to
/// determine the encoded byte size for fixed-length types.
template <typename InternalType>
static int ByteSize(const InternalType& v) { return sizeof(InternalType); }
/// Returns the encoded size of values of type t. Returns -1 if it is variable
/// length. This can be different than the slot size of the types.
static int EncodedByteSize(const ColumnType& t) {
switch (t.type) {
case TYPE_STRING:
case TYPE_VARCHAR:
case TYPE_CHAR:
// CHAR is varlen here because we don't write the padding to the file
return -1;
case TYPE_TINYINT:
case TYPE_SMALLINT:
case TYPE_INT:
case TYPE_DATE:
case TYPE_FLOAT:
return 4;
case TYPE_BIGINT:
case TYPE_DOUBLE:
return 8;
case TYPE_TIMESTAMP:
return 12;
case TYPE_DECIMAL:
return DecimalSize(t);
case TYPE_NULL:
case TYPE_BOOLEAN: // These types are not plain encoded.
default:
DCHECK(false);
return -1;
}
}
/// Returns the byte size of a value of Parqet type `type`. If `type` is
/// FIXED_LEN_BYTE_ARRAY, the argument `fixed_len_size` is returned.
/// The type must be one of INT32, INT64, INT96, FLOAT, DOUBLE or FIXED_LEN_BYTE_ARRAY.
/// BOOLEANs are not plain encoded and BYTE_ARRAYs do not have a fixed size, therefore
/// they are not supported.
static ALWAYS_INLINE int EncodedByteSize(const parquet::Type::type type,
const int fixed_len_size) {
switch (type) {
case parquet::Type::type::INT32: return sizeof(int32_t);
case parquet::Type::type::INT64: return sizeof(int64_t);
case parquet::Type::type::INT96: return 12;
case parquet::Type::type::FLOAT: return sizeof(float);
case parquet::Type::type::DOUBLE: return sizeof(double);
case parquet::Type::type::FIXED_LEN_BYTE_ARRAY: return fixed_len_size;
default: DCHECK(false);
};
return -1;
}
/// The minimum byte size to store decimals of with precision t.precision.
static int DecimalSize(const ColumnType& t) {
DCHECK(t.type == TYPE_DECIMAL);
// Numbers in the comment is the max positive value that can be represented
// with those number of bits (max negative is -(X + 1)).
// TODO: use closed form for this?
switch (t.precision) {
case 1: case 2:
return 1; // 127
case 3: case 4:
return 2; // 32,767
case 5: case 6:
return 3; // 8,388,607
case 7: case 8: case 9:
return 4; // 2,147,483,427
case 10: case 11:
return 5; // 549,755,813,887
case 12: case 13: case 14:
return 6; // 140,737,488,355,327
case 15: case 16:
return 7; // 36,028,797,018,963,967
case 17: case 18:
return 8; // 9,223,372,036,854,775,807
case 19: case 20: case 21:
return 9; // 2,361,183,241,434,822,606,847
case 22: case 23:
return 10; // 604,462,909,807,314,587,353,087
case 24: case 25: case 26:
return 11; // 154,742,504,910,672,534,362,390,527
case 27: case 28:
return 12; // 39,614,081,257,132,168,796,771,975,167
case 29: case 30: case 31:
return 13; // 10,141,204,801,825,835,211,973,625,643,007
case 32: case 33:
return 14; // 2,596,148,429,267,413,814,265,248,164,610,047
case 34: case 35:
return 15; // 664,613,997,892,457,936,451,903,530,140,172,287
case 36: case 37: case 38:
return 16; // 170,141,183,460,469,231,731,687,303,715,884,105,727
default:
DCHECK(false);
break;
}
return -1;
}
/// Encodes t into buffer. Returns the number of bytes added. buffer must
/// be preallocated and big enough. Buffer need not be aligned.
/// 'fixed_len_size' is only applicable for data encoded using FIXED_LEN_BYTE_ARRAY and
/// is the number of bytes the plain encoder should use.
template <typename InternalType>
static int Encode(const InternalType& t, int fixed_len_size, uint8_t* buffer) {
memcpy(buffer, &t, ByteSize(t));
return ByteSize(t);
}
template <typename InternalType>
static int DecodeByParquetType(const uint8_t* buffer, const uint8_t* buffer_end,
int fixed_len_size, InternalType* v, parquet::Type::type parquet_type) {
switch (parquet_type) {
case parquet::Type::BOOLEAN:
return ParquetPlainEncoder::Decode<InternalType, parquet::Type::BOOLEAN>(buffer,
buffer_end, fixed_len_size, v);
case parquet::Type::INT32:
return ParquetPlainEncoder::Decode<InternalType, parquet::Type::INT32>(buffer,
buffer_end, fixed_len_size, v);
case parquet::Type::INT64:
return ParquetPlainEncoder::Decode<InternalType, parquet::Type::INT64>(buffer,
buffer_end, fixed_len_size, v);
case parquet::Type::INT96:
return ParquetPlainEncoder::Decode<InternalType, parquet::Type::INT96>(buffer,
buffer_end, fixed_len_size, v);
case parquet::Type::FLOAT:
return ParquetPlainEncoder::Decode<InternalType, parquet::Type::FLOAT>(buffer,
buffer_end, fixed_len_size, v);
case parquet::Type::DOUBLE:
return ParquetPlainEncoder::Decode<InternalType, parquet::Type::DOUBLE>(buffer,
buffer_end, fixed_len_size, v);
case parquet::Type::BYTE_ARRAY:
return ParquetPlainEncoder::Decode<InternalType,
parquet::Type::BYTE_ARRAY>(buffer, buffer_end, fixed_len_size, v);
case parquet::Type::FIXED_LEN_BYTE_ARRAY:
return ParquetPlainEncoder::Decode<InternalType,
parquet::Type::FIXED_LEN_BYTE_ARRAY>(buffer, buffer_end, fixed_len_size, v);
default:
DCHECK(false) << "Unexpected physical type";
return -1;
}
}
/// Decodes t from 'buffer', reading up to the byte before 'buffer_end'. 'buffer' need
/// not be aligned. If PARQUET_TYPE is FIXED_LEN_BYTE_ARRAY then 'fixed_len_size' is the
/// size of the object. Otherwise, it is unused. Returns the number of bytes read or -1
/// if the value was not decoded successfully.
/// This generic template function is used when PARQUET_TYPE is one of INT32, INT64,
/// INT96, FLOAT, DOUBLE or FIXED_LEN_BYTE_ARRAY.
template <typename InternalType, parquet::Type::type PARQUET_TYPE>
static int Decode(const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size,
InternalType* v) {
/// We cannot make it a static assert because the DecodeByParquetType template
/// potentially calls this function with every combination of internal and parquet
/// types, not only the valid ones. The invalid combinations do not occur at runtime,
/// but they cannot be ruled out at compile time.
DCHECK(PARQUET_TYPE == parquet::Type::type::INT32
|| PARQUET_TYPE == parquet::Type::type::INT64
|| PARQUET_TYPE == parquet::Type::type::INT96
|| PARQUET_TYPE == parquet::Type::type::FLOAT
|| PARQUET_TYPE == parquet::Type::type::DOUBLE
|| PARQUET_TYPE == parquet::Type::type::FIXED_LEN_BYTE_ARRAY);
int byte_size = EncodedByteSize(PARQUET_TYPE, fixed_len_size);
if (UNLIKELY(buffer_end - buffer < byte_size)) return -1;
DecodeNoBoundsCheck<InternalType, PARQUET_TYPE>(buffer, buffer_end,
fixed_len_size, v);
return byte_size;
}
/// Returns the byte size of the encoded data when PLAIN encoding is used.
/// Returns -1 if the encoded data passes the end of the buffer.
template <parquet::Type::type PARQUET_TYPE>
static int64_t EncodedLen(const uint8_t* buffer, const uint8_t* buffer_end,
int fixed_len_size, int32_t num_values) {
using parquet::Type;
int byte_size = 0;
switch (PARQUET_TYPE) {
case Type::INT32: byte_size = 4; break;
case Type::INT64: byte_size = 8; break;
case Type::INT96: byte_size = 12; break;
case Type::FLOAT: byte_size = 4; break;
case Type::DOUBLE: byte_size = 8; break;
case Type::FIXED_LEN_BYTE_ARRAY: byte_size = fixed_len_size; break;
default:
DCHECK(false);
return -1;
}
int64_t encoded_len = byte_size * num_values;
return encoded_len > buffer_end - buffer ? -1 : encoded_len;
}
/// Batched version of Decode() that tries to decode 'num_values' values from the memory
/// range [buffer, buffer_end) and writes them to 'v' with a stride of 'stride' bytes.
/// Returns the number of bytes read from 'buffer' or -1 if there was an error
/// decoding, e.g. invalid data or running out of input data before reading
/// 'num_values'.
template <typename InternalType, parquet::Type::type PARQUET_TYPE>
static int64_t DecodeBatch(const uint8_t* buffer, const uint8_t* buffer_end,
int fixed_len_size, int64_t num_values, int64_t stride, InternalType* v);
private:
/// Decode values without bounds checking. `buffer_end` is only used for DCHECKs in
/// DEBUG mode, it is unused in RELEASE mode.
template <typename InternalType, parquet::Type::type PARQUET_TYPE>
static ALWAYS_INLINE inline void DecodeNoBoundsCheck(const uint8_t* buffer,
const uint8_t* buffer_end, int fixed_len_size, InternalType* v);
template <typename InternalType, parquet::Type::type PARQUET_TYPE>
static int64_t DecodeBatchAlwaysBoundsCheck(const uint8_t* buffer,
const uint8_t* buffer_end, int fixed_len_size, int64_t num_values, int64_t stride,
InternalType* v);
template <typename InternalType, parquet::Type::type PARQUET_TYPE>
static int64_t DecodeBatchOneBoundsCheck(const uint8_t* buffer,
const uint8_t* buffer_end, int fixed_len_size, int64_t num_values, int64_t stride,
InternalType* v);
};
/// Calling this with arguments of type ColumnType is certainly a programmer error, so we
/// disallow it.
template <> int ParquetPlainEncoder::ByteSize(const ColumnType& t);
/// Disable for bools. Plain encoding is not used for booleans.
template <> int ParquetPlainEncoder::ByteSize(const bool& b);
template <> int ParquetPlainEncoder::Encode(const bool&, int fixed_len_size, uint8_t*);
template <> int ParquetPlainEncoder::Decode<bool, parquet::Type::BOOLEAN>(const uint8_t*,
const uint8_t*, int fixed_len_size, bool* v);
template <>
inline int ParquetPlainEncoder::ByteSize(const Decimal4Value&) {
// Only used when the decimal is stored as INT32.
return sizeof(Decimal4Value::StorageType);
}
template <>
inline int ParquetPlainEncoder::ByteSize(const Decimal8Value&) {
// Only used when the decimal is stored as INT64.
return sizeof(Decimal8Value::StorageType);
}
template <>
inline int ParquetPlainEncoder::ByteSize(const Decimal16Value&) {
// Not used, since such big decimals can only be stored as BYTE_ARRAY or
// FIXED_LEN_BYTE_ARRAY.
DCHECK(false);
return -1;
}
/// Parquet doesn't have 8-bit or 16-bit ints. They are converted to 32-bit.
template <>
inline int ParquetPlainEncoder::ByteSize(const int8_t& v) { return sizeof(int32_t); }
template <>
inline int ParquetPlainEncoder::ByteSize(const int16_t& v) { return sizeof(int32_t); }
template <>
inline int ParquetPlainEncoder::ByteSize(const StringValue& v) {
return sizeof(int32_t) + v.len;
}
template <>
inline int ParquetPlainEncoder::ByteSize(const TimestampValue& v) {
return 12;
}
/// Returns the byte size of the encoded data when PLAIN encoding is used.
/// Returns -1 if the encoded data passes the end of the buffer.
template <>
inline int64_t ParquetPlainEncoder::EncodedLen<parquet::Type::BYTE_ARRAY>(
const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size,
int32_t num_values) {
const uint8_t* orig_buffer = buffer;
int64_t values_remaining = num_values;
while (values_remaining > 0) {
if (UNLIKELY(buffer_end - buffer < sizeof(int32_t))) return -1;
int32_t str_len;
memcpy(&str_len, buffer, sizeof(int32_t));
str_len += sizeof(int32_t);
if (UNLIKELY(str_len < sizeof(int32_t) || buffer_end - buffer < str_len)) return -1;
buffer += str_len;
--values_remaining;
}
return buffer - orig_buffer;
}
template <typename From, typename To>
inline int DecodeWithConversion(const uint8_t* buffer, const uint8_t* buffer_end, To* v) {
int byte_size = sizeof(From);
DCHECK_GE(buffer_end - buffer, byte_size);
From dest;
memcpy(&dest, buffer, byte_size);
*v = dest;
return byte_size;
}
/// Decodes a value without bounds checking.
/// This generic template function is used with the following types:
/// =============================
/// InternalType | PARQUET_TYPE
/// =============================
/// int8_t | INT32
/// int16_t | INT32
/// int32_t | INT32
/// int64_t | INT64
/// float | FLOAT
/// double | DOUBLE
/// Decimal4Value | INT32
/// Decimal8Value | INT64
template <typename InternalType, parquet::Type::type PARQUET_TYPE>
void ParquetPlainEncoder::DecodeNoBoundsCheck(const uint8_t* buffer,
const uint8_t* buffer_end, int fixed_len_size, InternalType* v) {
int byte_size = EncodedByteSize(PARQUET_TYPE, -1);
DCHECK_GE(buffer_end - buffer, byte_size);
/// This generic template is only used when either no conversion is needed or with
/// narrowing integer conversions (e.g. int32_t to int16_t or int8_t) where copying the
/// lower bytes is the correct conversion.
memcpy(v, buffer, sizeof(InternalType));
}
template <>
inline void ParquetPlainEncoder::
DecodeNoBoundsCheck<TimestampValue, parquet::Type::INT96>(const uint8_t* buffer,
const uint8_t* buffer_end, int fixed_len_size, TimestampValue* v) {
int byte_size = EncodedByteSize(parquet::Type::INT96, -1);
DCHECK_GE(buffer_end - buffer, byte_size);
/// We copy only 12 bytes from the input buffer but the destination is 16 bytes long
/// because of padding. The most significant 4 bits remain uninitialized as they are not
/// used.
memcpy(v, buffer, byte_size);
}
template <>
inline void ParquetPlainEncoder::DecodeNoBoundsCheck<int64_t, parquet::Type::INT32>(
const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, int64_t* v) {
DecodeWithConversion<int32_t, int64_t>(buffer, buffer_end, v);
}
template <>
inline void ParquetPlainEncoder::DecodeNoBoundsCheck<double, parquet::Type::INT32>(
const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, double* v) {
DecodeWithConversion<int32_t, double>(buffer, buffer_end, v);
}
template <>
inline void ParquetPlainEncoder::DecodeNoBoundsCheck<double, parquet::Type::FLOAT>(
const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, double* v) {
DecodeWithConversion<float, double>(buffer, buffer_end, v);
}
template <>
inline int ParquetPlainEncoder::Decode<DateValue, parquet::Type::type::INT32>(
const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, DateValue* v) {
int32_t val;
if (UNLIKELY(buffer_end - buffer < sizeof(val))) return -1;
memcpy(&val, buffer, sizeof(val));
// DateValue constructor validates int32_t date value
*v = DateValue(val);
return sizeof(val);
}
template<typename T>
inline int EncodeToInt32(const T& v, int fixed_len_size, uint8_t* buffer) {
int32_t val = v;
memcpy(buffer, &val, sizeof(int32_t));
return ParquetPlainEncoder::ByteSize(v);
}
template <>
inline int ParquetPlainEncoder::Encode(
const int8_t& v, int fixed_len_size, uint8_t* buffer) {
return EncodeToInt32(v, fixed_len_size, buffer);
}
template <>
inline int ParquetPlainEncoder::Encode(
const int16_t& v, int fixed_len_size, uint8_t* buffer) {
return EncodeToInt32(v, fixed_len_size, buffer);
}
template <>
inline int ParquetPlainEncoder::Encode(
const StringValue& v, int fixed_len_size, uint8_t* buffer) {
memcpy(buffer, &v.len, sizeof(int32_t));
memcpy(buffer + sizeof(int32_t), v.ptr, v.len);
return ByteSize(v);
}
template <>
inline int ParquetPlainEncoder::Decode<StringValue, parquet::Type::BYTE_ARRAY>(
const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size,
StringValue* v) {
if (UNLIKELY(buffer_end - buffer < sizeof(int32_t))) return -1;
memcpy(&v->len, buffer, sizeof(int32_t));
int byte_size = ByteSize(*v);
if (UNLIKELY(v->len < 0 || buffer_end - buffer < byte_size)) return -1;
v->ptr = reinterpret_cast<char*>(const_cast<uint8_t*>(buffer)) + sizeof(int32_t);
if (fixed_len_size > 0) v->len = std::min(v->len, fixed_len_size);
// we still read byte_size bytes, even if we truncate
return byte_size;
}
/// Write decimals as big endian (byte comparable) to benefit from common prefixes.
/// fixed_len_size can be less than sizeof(Decimal*Value) for space savings. This means
/// that the value in the in-memory format has leading zeros or negative 1's.
/// For example, precision 2 fits in 1 byte. All decimals stored as Decimal4Value
/// will have 3 bytes of leading zeros, we will only store the interesting byte.
template<typename T>
inline int EncodeDecimal(const T& v, int fixed_len_size, uint8_t* buffer) {
DecimalUtil::EncodeToFixedLenByteArray(buffer, fixed_len_size, v);
return fixed_len_size;
}
template <>
inline int ParquetPlainEncoder::Encode(
const Decimal4Value& v, int fixed_len_size, uint8_t* buffer) {
return EncodeDecimal(v, fixed_len_size, buffer);
}
template <>
inline int ParquetPlainEncoder::Encode(
const Decimal8Value& v, int fixed_len_size, uint8_t* buffer) {
return EncodeDecimal(v, fixed_len_size, buffer);
}
template <>
inline int ParquetPlainEncoder::Encode(
const Decimal16Value& v, int fixed_len_size, uint8_t* buffer) {
return EncodeDecimal(v, fixed_len_size, buffer);
}
template <typename InternalType, parquet::Type::type PARQUET_TYPE>
inline int64_t ParquetPlainEncoder::DecodeBatchAlwaysBoundsCheck(const uint8_t* buffer,
const uint8_t* buffer_end, int fixed_len_size, int64_t num_values, int64_t stride,
InternalType* v) {
const uint8_t* buffer_pos = buffer;
StrideWriter<InternalType> out(v, stride);
for (int64_t i = 0; i < num_values; ++i) {
int encoded_len = Decode<InternalType, PARQUET_TYPE>(
buffer_pos, buffer_end, fixed_len_size, out.Advance());
if (UNLIKELY(encoded_len < 0)) return -1;
buffer_pos += encoded_len;
}
return buffer_pos - buffer;
}
template <typename InternalType, parquet::Type::type PARQUET_TYPE>
inline int64_t ParquetPlainEncoder::DecodeBatchOneBoundsCheck(const uint8_t* buffer,
const uint8_t* buffer_end, int fixed_len_size, int64_t num_values, int64_t stride,
InternalType* v) {
const uint8_t* buffer_pos = buffer;
uint8_t* output = reinterpret_cast<uint8_t*>(v);
const int byte_size_of_element = EncodedByteSize(PARQUET_TYPE, fixed_len_size);
if (UNLIKELY(buffer_end - buffer < num_values * byte_size_of_element)) return -1;
/// We unroll the loop manually in batches of 8.
constexpr int batch = 8;
const int full_batches = num_values / batch;
const int remainder = num_values % batch;
for (int b = 0; b < full_batches; b++) {
#pragma push_macro("DECODE_NO_CHECK_UNROLL")
#define DECODE_NO_CHECK_UNROLL(ignore1, i, ignore2) \
DecodeNoBoundsCheck<InternalType, PARQUET_TYPE>( \
buffer_pos + i * byte_size_of_element, buffer_end, fixed_len_size, \
reinterpret_cast<InternalType*>(output + i * stride));
BOOST_PP_REPEAT_FROM_TO(0, 8 /* The value of `batch` */,
DECODE_NO_CHECK_UNROLL, ignore);
#pragma pop_macro("DECODE_NO_CHECK_UNROLL")
output += batch * stride;
buffer_pos += batch * byte_size_of_element;
}
StrideWriter<InternalType> out(reinterpret_cast<InternalType*>(output), stride);
for (int i = 0; i < remainder; i++) {
DecodeNoBoundsCheck<InternalType, PARQUET_TYPE>(
buffer_pos, buffer_end, fixed_len_size, out.Advance());
buffer_pos += byte_size_of_element;
}
DCHECK_EQ(buffer_pos - buffer, num_values * byte_size_of_element);
return buffer_pos - buffer;
}
template <typename InternalType, parquet::Type::type PARQUET_TYPE>
inline int64_t ParquetPlainEncoder::DecodeBatch(const uint8_t* buffer,
const uint8_t* buffer_end, int fixed_len_size, int64_t num_values, int64_t stride,
InternalType* v) {
/// Whether bounds checking needs to be done for every element or we can check the whole
/// batch at the same time.
constexpr bool has_variable_length =
PARQUET_TYPE == parquet::Type::type::BYTE_ARRAY;
if (has_variable_length) {
return DecodeBatchAlwaysBoundsCheck<InternalType, PARQUET_TYPE>(buffer, buffer_end,
fixed_len_size, num_values, stride, v);
} else {
return DecodeBatchOneBoundsCheck<InternalType, PARQUET_TYPE>(buffer, buffer_end,
fixed_len_size, num_values, stride, v);
}
}
template <typename T>
inline void DecodeDecimalFixedLen(
const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, T* v) {
DCHECK_GE(buffer_end - buffer, fixed_len_size);
DecimalUtil::DecodeFromFixedLenByteArray(buffer, fixed_len_size, v);
}
template <>
inline int ParquetPlainEncoder::
Decode<bool, parquet::Type::BOOLEAN>(const uint8_t* buffer,
const uint8_t* buffer_end, int fixed_len_size, bool* v) {
DCHECK(false) << "Use ParquetBoolDecoder for decoding bools";
return -1;
}
template <>
inline void ParquetPlainEncoder::
DecodeNoBoundsCheck<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size,
Decimal4Value* v) {
DecodeDecimalFixedLen(buffer, buffer_end, fixed_len_size, v);
}
template <>
inline void ParquetPlainEncoder::
DecodeNoBoundsCheck<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size,
Decimal8Value* v) {
DecodeDecimalFixedLen(buffer, buffer_end, fixed_len_size, v);
}
template <>
inline void ParquetPlainEncoder::
DecodeNoBoundsCheck<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size,
Decimal16Value* v) {
DecodeDecimalFixedLen(buffer, buffer_end, fixed_len_size, v);
}
/// Helper method to decode Decimal type stored as variable length byte array.
template<typename T>
inline int DecodeDecimalByteArray(const uint8_t* buffer, const uint8_t* buffer_end,
int fixed_len_size, T* v) {
if (UNLIKELY(buffer_end - buffer < sizeof(int32_t))) return -1;
int encoded_byte_size;
memcpy(&encoded_byte_size, buffer, sizeof(int32_t));
int byte_size = sizeof(int32_t) + encoded_byte_size;
if (UNLIKELY(encoded_byte_size < 0 || buffer_end - buffer < byte_size)) return -1;
uint8_t* val_ptr = const_cast<uint8_t*>(buffer) + sizeof(int32_t);
DecimalUtil::DecodeFromFixedLenByteArray(val_ptr, encoded_byte_size, v);
return byte_size;
}
template <>
inline int ParquetPlainEncoder::Decode<Decimal4Value, parquet::Type::BYTE_ARRAY>(
const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size,
Decimal4Value* v) {
return DecodeDecimalByteArray(buffer, buffer_end, fixed_len_size, v);
}
template <>
inline int ParquetPlainEncoder::Decode<Decimal8Value, parquet::Type::BYTE_ARRAY>(
const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size,
Decimal8Value* v) {
return DecodeDecimalByteArray(buffer, buffer_end, fixed_len_size, v);
}
template <>
inline int ParquetPlainEncoder::Decode<Decimal16Value, parquet::Type::BYTE_ARRAY>(
const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size,
Decimal16Value* v) {
return DecodeDecimalByteArray(buffer, buffer_end, fixed_len_size, v);
}
/// Helper class that contains the parameters needed for Timestamp decoding.
/// Can be safely passed by value.
class ParquetTimestampDecoder {
public:
ParquetTimestampDecoder() {}
ParquetTimestampDecoder( const parquet::SchemaElement& e, const Timezone* timezone,
bool convert_int96_timestamps);
bool NeedsConversion() const { return timezone_ != nullptr; }
/// Decodes next PARQUET_TYPE from 'buffer', reading up to the byte before 'buffer_end'
/// and converts it TimestampValue. 'buffer' need not be aligned.
template <parquet::Type::type PARQUET_TYPE>
int Decode(const uint8_t* buffer, const uint8_t* buffer_end, TimestampValue* v) const;
/// Batched version of Decode() that tries to decode 'num_values' values from the memory
/// range [buffer, buffer_end) and writes them to 'v' with a stride of 'stride' bytes.
/// Returns the number of bytes read from 'buffer' or -1 if there was an error
/// decoding, e.g. invalid data or running out of input data before reading
/// 'num_values'.
template <parquet::Type::type PARQUET_TYPE>
int64_t DecodeBatch(const uint8_t* buffer, const uint8_t* buffer_end,
int64_t num_values, int64_t stride, TimestampValue* v);
TimestampValue Int64ToTimestampValue(int64_t unix_time) const {
switch (precision_) {
case MILLI:
return TimestampValue::UtcFromUnixTimeMillis(unix_time);
case MICRO:
return TimestampValue::UtcFromUnixTimeMicros(unix_time);
case NANO:
return TimestampValue::UtcFromUnixTimeLimitedRangeNanos(unix_time);
default:
DCHECK(false);
return TimestampValue();
}
}
void ConvertToLocalTime(TimestampValue* v) const {
DCHECK(timezone_ != nullptr);
if (v->HasDateAndTime()) v->UtcToLocal(*timezone_);
}
/// Timezone conversion of min/max stats need some extra logic because UTC->local
/// conversion can change ordering near timezone rule changes. The max value is
/// increased and min value is decreased to avoid incorrectly dropping column chunks
/// (or pages once IMPALA-5843 is ready).
/// If timestamp t >= v before conversion, then this function converts v in such a
/// way that the same will be true after t is converted.
void ConvertMinStatToLocalTime(TimestampValue* v) const;
/// If timestamp t <= v before conversion, then this function converts v in such a
/// way that the same will be true after t is converted.
void ConvertMaxStatToLocalTime(TimestampValue* v) const;
enum Precision { MILLI, MICRO, NANO };
/// Processes the Parquet schema element 'e' and extracts timestamp related parameters.
/// Returns true if the schema describes a valid timestamp column.
/// 'precision': unit of the timestamp
/// 'needs_conversion': whether utc->local conversion is necessary
static bool GetTimestampInfoFromSchema(const parquet::SchemaElement& e,
Precision& precision, bool& needs_conversion);
private:
/// Timezone used for UTC->Local conversions. If nullptr, no conversion is needed.
const Timezone* timezone_ = nullptr;
/// Unit of the encoded timestamp. Used to decide between milli and microseconds during
/// INT64 decoding. INT64 with nanosecond precision (and reduced range) is also planned
/// to be implemented once it is added in Parquet (PARQUET-1387).
Precision precision_ = NANO;
};
template <>
inline int ParquetTimestampDecoder::Decode<parquet::Type::INT64>(
const uint8_t* buffer, const uint8_t* buffer_end, TimestampValue* v) const {
int64_t unix_time;
int bytes_read = ParquetPlainEncoder::Decode<int64_t, parquet::Type::INT64>(
buffer, buffer_end, 0, &unix_time);
if (UNLIKELY(bytes_read < 0)) {
return bytes_read;
}
*v = Int64ToTimestampValue(unix_time);
// TODO: It would be more efficient to do the timezone conversion in the same step
// as the int64_t -> TimestampValue conversion. This would be also needed to
// move conversion/validation to dictionary construction (IMPALA-4994) and to
// implement dictionary filtering for TimestampValues.
return bytes_read;
}
template <>
inline int ParquetTimestampDecoder::Decode<parquet::Type::INT96>(
const uint8_t* buffer, const uint8_t* buffer_end, TimestampValue* v) const {
DCHECK_EQ(precision_, NANO);
return ParquetPlainEncoder::Decode<TimestampValue, parquet::Type::INT96>(
buffer, buffer_end, 0, v);
}
template <parquet::Type::type PARQUET_TYPE>
inline int64_t ParquetTimestampDecoder::DecodeBatch(const uint8_t* buffer,
const uint8_t* buffer_end, int64_t num_values, int64_t stride,
TimestampValue* v) {
const uint8_t* buffer_pos = buffer;
StrideWriter<TimestampValue> out(v, stride);
for (int64_t i = 0; i < num_values; ++i) {
int encoded_len = Decode<PARQUET_TYPE>(buffer_pos, buffer_end, out.Advance());
if (UNLIKELY(encoded_len < 0)) return -1;
buffer_pos += encoded_len;
}
return buffer_pos - buffer;
}
}
#endif