blob: 285ef0d639fc1e88f2dd85960fec482b0ee16297 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/parquet/parquet-common.h"
namespace impala {
/// Mapping of Parquet codec enums to Impala enums
const THdfsCompression::type PARQUET_TO_IMPALA_CODEC[] = {
THdfsCompression::NONE,
THdfsCompression::SNAPPY,
THdfsCompression::GZIP,
THdfsCompression::LZO,
THdfsCompression::BROTLI,
THdfsCompression::LZ4_BLOCKED,
THdfsCompression::ZSTD
};
const int PARQUET_TO_IMPALA_CODEC_SIZE =
sizeof(PARQUET_TO_IMPALA_CODEC) / sizeof(PARQUET_TO_IMPALA_CODEC[0]);
/// Mapping of Impala codec enums to Parquet enums
const parquet::CompressionCodec::type IMPALA_TO_PARQUET_CODEC[] = {
parquet::CompressionCodec::UNCOMPRESSED,
parquet::CompressionCodec::SNAPPY, // DEFAULT
parquet::CompressionCodec::GZIP, // GZIP
parquet::CompressionCodec::GZIP, // DEFLATE
// Placeholder for BZIP2 which isn't a valid parquet codec.
parquet::CompressionCodec::SNAPPY, // BZIP2
parquet::CompressionCodec::SNAPPY,
parquet::CompressionCodec::SNAPPY, // SNAPPY_BLOCKED
parquet::CompressionCodec::LZO,
parquet::CompressionCodec::LZ4,
parquet::CompressionCodec::GZIP, // ZLIB
parquet::CompressionCodec::ZSTD,
parquet::CompressionCodec::BROTLI,
parquet::CompressionCodec::LZ4 // LZ4_BLOCKED
};
const int IMPALA_TO_PARQUET_CODEC_SIZE =
sizeof(IMPALA_TO_PARQUET_CODEC) / sizeof(IMPALA_TO_PARQUET_CODEC[0]);
THdfsCompression::type ConvertParquetToImpalaCodec(
parquet::CompressionCodec::type codec) {
DCHECK_GE(codec, 0);
DCHECK_LT(codec, PARQUET_TO_IMPALA_CODEC_SIZE);
return PARQUET_TO_IMPALA_CODEC[codec];
}
parquet::CompressionCodec::type ConvertImpalaToParquetCodec(
THdfsCompression::type codec) {
DCHECK_GE(codec, 0);
DCHECK_LT(codec, IMPALA_TO_PARQUET_CODEC_SIZE);
return IMPALA_TO_PARQUET_CODEC[codec];
}
void GetRowRangeForPage(const parquet::RowGroup& row_group,
const parquet::OffsetIndex& offset_index, int page_idx, RowRange* row_range) {
const auto& page_locations = offset_index.page_locations;
DCHECK_LT(page_idx, page_locations.size());
row_range->first = page_locations[page_idx].first_row_index;
if (page_idx == page_locations.size() - 1) {
row_range->last = row_group.num_rows - 1;
} else {
row_range->last = page_locations[page_idx + 1].first_row_index - 1;
}
}
static bool ValidateRowRangesData(const vector<RowRange>& skip_ranges,
const int64_t num_rows) {
for (auto& range : skip_ranges) {
if (range.first > range.last || range.first < 0 || range.last >= num_rows) {
return false;
}
}
return true;
}
bool ComputeCandidateRanges(const int64_t num_rows, vector<RowRange>* skip_ranges,
vector<RowRange>* candidate_ranges) {
if (!ValidateRowRangesData(*skip_ranges, num_rows)) return false;
sort(skip_ranges->begin(), skip_ranges->end());
candidate_ranges->clear();
// 'skip_end' tracks the end of a continuous range of rows that needs to be skipped.
// 'skip_ranges' are sorted, so we can start at the beginning.
int skip_end = -1;
for (auto& skip_range : *skip_ranges) {
if (skip_end + 1 >= skip_range.first) {
// We can extend 'skip_end' to the end of 'skip_range'.
if (skip_end < skip_range.last) skip_end = skip_range.last;
} else {
// We found a gap in 'skip_ranges', i.e. a row range that is not covered by
// 'skip_ranges'.
candidate_ranges->push_back({skip_end + 1, skip_range.first - 1});
// Let's track the end of the next continuous range that needs to be skipped.
skip_end = skip_range.last;
}
}
// If the last skip ended before 'range_end', add the remaining range to
// the filtered ranges.
if (skip_end < num_rows - 1) {
candidate_ranges->push_back({skip_end + 1, num_rows - 1});
}
return true;
}
static bool ValidatePageLocations(const vector<parquet::PageLocation>& page_locations,
const int64_t num_rows) {
for (int i = 0; i < page_locations.size(); ++i) {
auto& page_loc = page_locations[i];
if (page_loc.first_row_index < 0 || page_loc.first_row_index >= num_rows) {
return false;
}
if (i + 1 < page_locations.size()) {
auto& next_page_loc = page_locations[i+1];
if (page_loc.first_row_index >= next_page_loc.first_row_index) return false;
}
}
return true;
}
static bool RangesIntersect(const RowRange& lhs,
const RowRange& rhs) {
int64_t higher_first = std::max(lhs.first, rhs.first);
int64_t lower_last = std::min(lhs.last, rhs.last);
return higher_first <= lower_last;
}
bool ComputeCandidatePages(
const vector<parquet::PageLocation>& page_locations,
const vector<RowRange>& candidate_ranges,
const int64_t num_rows, vector<int>* candidate_pages) {
if (!ValidatePageLocations(page_locations, num_rows)) return false;
int range_idx = 0;
for (int i = 0; i < page_locations.size(); ++i) {
auto& page_location = page_locations[i];
int64_t page_start = page_location.first_row_index;
int64_t page_end = i != page_locations.size() - 1 ?
page_locations[i + 1].first_row_index - 1 :
num_rows - 1;
while (range_idx < candidate_ranges.size() &&
candidate_ranges[range_idx].last < page_start) {
++range_idx;
}
if (range_idx >= candidate_ranges.size()) break;
if (RangesIntersect(candidate_ranges[range_idx], {page_start, page_end})) {
candidate_pages->push_back(i);
}
}
// When there are candidate ranges, then we should have at least one candidate page.
if (!candidate_ranges.empty() && candidate_pages->empty()) return false;
return true;
}
bool ParquetTimestampDecoder::GetTimestampInfoFromSchema(const parquet::SchemaElement& e,
Precision& precision, bool& needs_conversion) {
if (e.type == parquet::Type::INT96) {
// Metadata does not contain information about being UTC normalized or not. The
// caller may override 'needs_conversion' depending on flags and writer.
needs_conversion = false;
precision = NANO;
return true;
} else if (e.type != parquet::Type::INT64) {
// Timestamps can be only encoded as INT64 or INT96, return false for other types.
return false;
}
if (e.__isset.logicalType) {
if (!e.logicalType.__isset.TIMESTAMP) return false;
// Logical type (introduced in PARQUET-1253) contains explicit information about
// being UTC normalized or not.
needs_conversion = e.logicalType.TIMESTAMP.isAdjustedToUTC;
if (e.logicalType.TIMESTAMP.unit.__isset.MILLIS) {
precision = ParquetTimestampDecoder::MILLI;
}
else if (e.logicalType.TIMESTAMP.unit.__isset.MICROS) {
precision = ParquetTimestampDecoder::MICRO;
}
else if (e.logicalType.TIMESTAMP.unit.__isset.NANOS) {
precision = ParquetTimestampDecoder::NANO;
} else {
return false;
}
} else if (e.__isset.converted_type) {
// Converted type does not contain information about being UTC normalized or not.
// Timestamp with converted type but without logical type are/were never written
// by Impala, so it is assumed that the writer is Parquet-mr and that timezone
// conversion is needed.
needs_conversion = true;
if (e.converted_type == parquet::ConvertedType::TIMESTAMP_MILLIS) {
precision = ParquetTimestampDecoder::MILLI;
}
else if (e.converted_type == parquet::ConvertedType::TIMESTAMP_MICROS) {
precision = ParquetTimestampDecoder::MICRO;
} else {
// There is no TIMESTAMP_NANO converted type.
return false;
}
} else {
// Either logical or converted type must be set for int64 timestamps.
return false;
}
return true;
}
ParquetTimestampDecoder::ParquetTimestampDecoder(const parquet::SchemaElement& e,
const Timezone* timezone, bool convert_int96_timestamps) {
bool needs_conversion = false;
bool valid_schema = GetTimestampInfoFromSchema(e, precision_, needs_conversion);
DCHECK(valid_schema); // Invalid schemas should be rejected in an earlier step.
if (e.type == parquet::Type::INT96 && convert_int96_timestamps) needs_conversion = true;
if (needs_conversion) timezone_ = timezone;
}
void ParquetTimestampDecoder::ConvertMinStatToLocalTime(TimestampValue* v) const {
DCHECK(timezone_ != nullptr);
if (!v->HasDateAndTime()) return;
TimestampValue repeated_period_start;
v->UtcToLocal(*timezone_, &repeated_period_start);
if (repeated_period_start.HasDateAndTime()) *v = repeated_period_start;
}
void ParquetTimestampDecoder::ConvertMaxStatToLocalTime(TimestampValue* v) const {
DCHECK(timezone_ != nullptr);
if (!v->HasDateAndTime()) return;
TimestampValue repeated_period_end;
v->UtcToLocal(*timezone_, nullptr, &repeated_period_end);
if (repeated_period_end.HasDateAndTime()) *v = repeated_period_end;
}
}