blob: 95920ef2a16006c17b0f65221dd61e2f9fe2cb46 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "parquet_common.h"
#include <glog/logging.h>
#include "common/cast_set.h"
#include "util/simd/bits.h"
#include "vec/core/types.h"
namespace doris::vectorized {
#include "common/compile_check_begin.h"
const int32_t ParquetInt96::JULIAN_EPOCH_OFFSET_DAYS = 2440588;
const int64_t ParquetInt96::MICROS_IN_DAY = 86400000000;
const int64_t ParquetInt96::NANOS_PER_MICROSECOND = 1000;
Status FilterMap::init(const uint8_t* filter_map_data, size_t filter_map_size, bool filter_all) {
_filter_all = filter_all;
_filter_map_data = filter_map_data;
_filter_map_size = filter_map_size;
if (filter_all) {
_has_filter = true;
_filter_ratio = 1;
} else if (filter_map_data == nullptr) {
_has_filter = false;
_filter_ratio = 0;
} else {
size_t filter_count = simd::count_zero_num(reinterpret_cast<const int8_t*>(filter_map_data),
filter_map_size);
if (filter_count == filter_map_size) {
_has_filter = true;
_filter_all = true;
_filter_ratio = 1;
} else if (filter_count > 0 && filter_map_size > 0) {
_has_filter = true;
_filter_ratio = (double)filter_count / (double)filter_map_size;
} else {
_has_filter = false;
_filter_ratio = 0;
}
}
return Status::OK();
}
bool FilterMap::can_filter_all(size_t remaining_num_values, size_t filter_map_index) {
if (!_has_filter) {
return false;
}
if (_filter_all) {
// all data in normal columns can be skipped when _filter_all = true,
// so the remaining_num_values should be less than the remaining filter map size.
DCHECK_LE(remaining_num_values + filter_map_index, _filter_map_size);
// return true always, to make sure that the data in normal columns can be skipped.
return true;
}
if (remaining_num_values + filter_map_index > _filter_map_size) {
return false;
}
return simd::count_zero_num(
reinterpret_cast<const int8_t*>(_filter_map_data + filter_map_index),
remaining_num_values) == remaining_num_values;
}
Status FilterMap::generate_nested_filter_map(const std::vector<level_t>& rep_levels,
std::vector<uint8_t>& nested_filter_map_data,
std::unique_ptr<FilterMap>* nested_filter_map,
size_t* current_row_ptr, size_t start_index) const {
if (!has_filter() || filter_all()) {
return Status::InternalError(fmt::format(
"FilterMap::generate_nested_filter_map failed: has_filter={}, filter_all={}",
has_filter(), filter_all()));
}
if (rep_levels.empty()) {
return Status::OK();
}
nested_filter_map_data.resize(rep_levels.size());
size_t current_row = current_row_ptr ? *current_row_ptr : 0;
for (size_t i = start_index; i < rep_levels.size(); i++) {
if (i != start_index && rep_levels[i] == 0) {
current_row++;
if (current_row >= _filter_map_size) {
return Status::InvalidArgument(fmt::format(
"current_row >= _filter_map_size. current_row: {}, _filter_map_size: {}",
current_row, _filter_map_size));
}
}
nested_filter_map_data[i] = _filter_map_data[current_row];
}
if (current_row_ptr) {
*current_row_ptr = current_row;
}
auto new_filter = std::make_unique<FilterMap>();
RETURN_IF_ERROR(
new_filter->init(nested_filter_map_data.data(), nested_filter_map_data.size(), false));
*nested_filter_map = std::move(new_filter);
return Status::OK();
}
Status ColumnSelectVector::init(const std::vector<uint16_t>& run_length_null_map, size_t num_values,
NullMap* null_map, FilterMap* filter_map, size_t filter_map_index,
const std::unordered_set<size_t>* skipped_indices) {
_num_values = num_values;
_num_nulls = 0;
_read_index = 0;
size_t map_index = 0;
bool is_null = false;
_has_filter = filter_map->has_filter();
if (filter_map->has_filter()) {
// No run length null map is generated when _filter_all = true
// DCHECK(!filter_map->filter_all());
_data_map.resize(num_values);
for (auto& run_length : run_length_null_map) {
if (is_null) {
_num_nulls += run_length;
for (int i = 0; i < run_length; ++i) {
_data_map[map_index++] = FILTERED_NULL;
}
} else {
for (int i = 0; i < run_length; ++i) {
_data_map[map_index++] = FILTERED_CONTENT;
}
}
is_null = !is_null;
}
size_t num_read = 0;
size_t i = 0;
size_t valid_count = 0;
while (valid_count < num_values) {
DCHECK_LT(filter_map_index + i, filter_map->filter_map_size());
if (skipped_indices != nullptr && skipped_indices->count(filter_map_index + i) > 0) {
++i;
continue;
}
if (filter_map->filter_map_data()[filter_map_index + i]) {
_data_map[valid_count] =
_data_map[valid_count] == FILTERED_NULL ? NULL_DATA : CONTENT;
num_read++;
}
++valid_count;
++i;
}
_num_filtered = num_values - num_read;
if (null_map != nullptr && num_read > 0) {
NullMap& map_data_column = *null_map;
auto null_map_index = map_data_column.size();
map_data_column.resize(null_map_index + num_read);
if (_num_nulls == 0) {
memset(map_data_column.data() + null_map_index, 0, num_read);
} else if (_num_nulls == num_values) {
memset(map_data_column.data() + null_map_index, 1, num_read);
} else {
for (i = 0; i < num_values; ++i) {
if (_data_map[i] == CONTENT) {
map_data_column[null_map_index++] = (UInt8) false;
} else if (_data_map[i] == NULL_DATA) {
map_data_column[null_map_index++] = (UInt8) true;
}
}
}
}
} else {
_num_filtered = 0;
_run_length_null_map = &run_length_null_map;
if (null_map != nullptr) {
NullMap& map_data_column = *null_map;
auto null_map_index = map_data_column.size();
map_data_column.resize(null_map_index + num_values);
for (auto& run_length : run_length_null_map) {
if (is_null) {
memset(map_data_column.data() + null_map_index, 1, run_length);
null_map_index += run_length;
_num_nulls += run_length;
} else {
memset(map_data_column.data() + null_map_index, 0, run_length);
null_map_index += run_length;
}
is_null = !is_null;
}
} else {
for (auto& run_length : run_length_null_map) {
if (is_null) {
_num_nulls += run_length;
}
is_null = !is_null;
}
}
}
return Status::OK();
}
ParsedVersion::ParsedVersion(std::string application, std::optional<std::string> version,
std::optional<std::string> app_build_hash)
: _application(std::move(application)),
_version(std::move(version)),
_app_build_hash(std::move(app_build_hash)) {}
bool ParsedVersion::operator==(const ParsedVersion& other) const {
return _application == other._application && _version == other._version &&
_app_build_hash == other._app_build_hash;
}
bool ParsedVersion::operator!=(const ParsedVersion& other) const {
return !(*this == other);
}
size_t ParsedVersion::hash() const {
std::hash<std::string> hasher;
return hasher(_application) ^ (_version ? hasher(*_version) : 0) ^
(_app_build_hash ? hasher(*_app_build_hash) : 0);
}
std::string ParsedVersion::to_string() const {
return "ParsedVersion(application=" + _application +
", semver=" + (_version ? *_version : "null") +
", app_build_hash=" + (_app_build_hash ? *_app_build_hash : "null") + ")";
}
Status VersionParser::parse(const std::string& created_by,
std::unique_ptr<ParsedVersion>* parsed_version) {
static const std::string FORMAT =
"(.*?)\\s+version\\s*(?:([^(]*?)\\s*(?:\\(\\s*build\\s*([^)]*?)\\s*\\))?)?";
static const std::regex PATTERN(FORMAT);
std::smatch matcher;
if (!std::regex_match(created_by, matcher, PATTERN)) {
return Status::InternalError(fmt::format("Could not parse created_by: {}, using format: {}",
created_by, FORMAT));
}
std::string application = matcher[1].str();
if (application.empty()) {
return Status::InternalError("application cannot be null or empty");
}
std::optional<std::string> semver =
matcher[2].str().empty() ? std::nullopt : std::optional<std::string>(matcher[2].str());
std::optional<std::string> app_build_hash =
matcher[3].str().empty() ? std::nullopt : std::optional<std::string>(matcher[3].str());
*parsed_version = std::make_unique<ParsedVersion>(application, semver, app_build_hash);
return Status::OK();
}
SemanticVersion::SemanticVersion(int major, int minor, int patch)
: _major(major),
_minor(minor),
_patch(patch),
_prerelease(false),
_unknown(std::nullopt),
_pre(std::nullopt),
_build_info(std::nullopt) {}
#ifdef BE_TEST
SemanticVersion::SemanticVersion(int major, int minor, int patch, bool has_unknown)
: _major(major),
_minor(minor),
_patch(patch),
_prerelease(has_unknown),
_unknown(std::nullopt),
_pre(std::nullopt),
_build_info(std::nullopt) {}
#endif
SemanticVersion::SemanticVersion(int major, int minor, int patch,
std::optional<std::string> unknown, std::optional<std::string> pre,
std::optional<std::string> build_info)
: _major(major),
_minor(minor),
_patch(patch),
_prerelease(unknown.has_value() && !unknown.value().empty()),
_unknown(std::move(unknown)),
_pre(pre.has_value() ? std::optional<Prerelease>(Prerelease(std::move(pre.value())))
: std::nullopt),
_build_info(std::move(build_info)) {}
Status SemanticVersion::parse(const std::string& version,
std::unique_ptr<SemanticVersion>* semantic_version) {
static const std::regex pattern(R"(^(\d+)\.(\d+)\.(\d+)([^-+]*)?(?:-([^+]*))?(?:\+(.*))?$)");
std::smatch match;
if (!std::regex_match(version, match, pattern)) {
return Status::InternalError(version + " does not match format");
}
int major = std::stoi(match[1].str());
int minor = std::stoi(match[2].str());
int patch = std::stoi(match[3].str());
std::optional<std::string> unknown =
match[4].str().empty() ? std::nullopt : std::optional<std::string>(match[4].str());
std::optional<std::string> prerelease =
match[5].str().empty() ? std::nullopt : std::optional<std::string>(match[5].str());
std::optional<std::string> build_info =
match[6].str().empty() ? std::nullopt : std::optional<std::string>(match[6].str());
if (major < 0 || minor < 0 || patch < 0) {
return Status::InternalError("major({}), minor({}), and patch({}) must all be >= 0", major,
minor, patch);
}
*semantic_version =
std::make_unique<SemanticVersion>(major, minor, patch, unknown, prerelease, build_info);
return Status::OK();
}
int SemanticVersion::compare_to(const SemanticVersion& other) const {
if (int cmp = _compare_integers(_major, other._major); cmp != 0) {
return cmp;
}
if (int cmp = _compare_integers(_minor, other._minor); cmp != 0) {
return cmp;
}
if (int cmp = _compare_integers(_patch, other._patch); cmp != 0) {
return cmp;
}
if (int cmp = _compare_booleans(other._prerelease, _prerelease); cmp != 0) {
return cmp;
}
if (_pre.has_value()) {
if (other._pre.has_value()) {
return _pre.value().compare_to(other._pre.value());
} else {
return -1;
}
} else if (other._pre.has_value()) {
return 1;
}
return 0;
}
bool SemanticVersion::operator==(const SemanticVersion& other) const {
return compare_to(other) == 0;
}
bool SemanticVersion::operator!=(const SemanticVersion& other) const {
return !(*this == other);
}
std::string SemanticVersion::to_string() const {
std::string result =
std::to_string(_major) + "." + std::to_string(_minor) + "." + std::to_string(_patch);
if (_prerelease && _unknown) result += _unknown.value();
if (_pre) result += _pre.value().to_string();
if (_build_info) result += _build_info.value();
return result;
}
SemanticVersion::NumberOrString::NumberOrString(const std::string& value_string)
: _original(value_string) {
const static std::regex NUMERIC("\\d+");
_is_numeric = std::regex_match(_original, NUMERIC);
_number = -1;
if (_is_numeric) {
_number = std::stoi(_original);
}
}
SemanticVersion::NumberOrString::NumberOrString(const NumberOrString& other)
: _original(other._original), _is_numeric(other._is_numeric), _number(other._number) {}
int SemanticVersion::NumberOrString::compare_to(const SemanticVersion::NumberOrString& that) const {
if (this->_is_numeric != that._is_numeric) {
return this->_is_numeric ? -1 : 1;
}
if (_is_numeric) {
return this->_number - that._number;
}
return this->_original.compare(that._original);
}
std::string SemanticVersion::NumberOrString::to_string() const {
return _original;
}
bool SemanticVersion::NumberOrString::operator<(const SemanticVersion::NumberOrString& that) const {
return compare_to(that) < 0;
}
bool SemanticVersion::NumberOrString::operator==(
const SemanticVersion::NumberOrString& that) const {
return compare_to(that) == 0;
}
bool SemanticVersion::NumberOrString::operator!=(
const SemanticVersion::NumberOrString& that) const {
return !(*this == that);
}
bool SemanticVersion::NumberOrString::operator>(const SemanticVersion::NumberOrString& that) const {
return compare_to(that) > 0;
}
bool SemanticVersion::NumberOrString::operator<=(
const SemanticVersion::NumberOrString& that) const {
return !(*this > that);
}
bool SemanticVersion::NumberOrString::operator>=(
const SemanticVersion::NumberOrString& that) const {
return !(*this < that);
}
int SemanticVersion::_compare_integers(int x, int y) {
return (x < y) ? -1 : ((x == y) ? 0 : 1);
}
int SemanticVersion::_compare_booleans(bool x, bool y) {
return (x == y) ? 0 : (x ? 1 : -1);
}
std::vector<std::string> SemanticVersion::Prerelease::_split(const std::string& s,
const std::regex& delimiter) {
std::sregex_token_iterator iter(s.begin(), s.end(), delimiter, -1);
std::sregex_token_iterator end;
std::vector<std::string> tokens(iter, end);
return tokens;
}
SemanticVersion::Prerelease::Prerelease(std::string original) : _original(std::move(original)) {
static const std::regex DOT("\\.");
auto parts = _split(_original, DOT);
for (const auto& part : parts) {
NumberOrString number_or_string(part);
_identifiers.emplace_back(number_or_string);
}
}
int SemanticVersion::Prerelease::compare_to(const Prerelease& that) const {
auto size = std::min(this->_identifiers.size(), that._identifiers.size());
for (int i = 0; i < size; ++i) {
int cmp = this->_identifiers[i].compare_to(that._identifiers[i]);
if (cmp != 0) {
return cmp;
}
}
return static_cast<int>(this->_identifiers.size()) - static_cast<int>(that._identifiers.size());
}
std::string SemanticVersion::Prerelease::to_string() const {
return _original;
}
bool SemanticVersion::Prerelease::operator<(const Prerelease& that) const {
return compare_to(that) < 0;
}
bool SemanticVersion::Prerelease::operator==(const Prerelease& that) const {
return compare_to(that) == 0;
}
bool SemanticVersion::Prerelease::operator!=(const Prerelease& that) const {
return !(*this == that);
}
bool SemanticVersion::Prerelease::operator>(const Prerelease& that) const {
return compare_to(that) > 0;
}
bool SemanticVersion::Prerelease::operator<=(const Prerelease& that) const {
return !(*this > that);
}
bool SemanticVersion::Prerelease::operator>=(const Prerelease& that) const {
return !(*this < that);
}
const SemanticVersion CorruptStatistics::PARQUET_251_FIXED_VERSION(1, 8, 0);
const SemanticVersion CorruptStatistics::CDH_5_PARQUET_251_FIXED_START(1, 5, 0, std::nullopt,
"cdh5.5.0", std::nullopt);
const SemanticVersion CorruptStatistics::CDH_5_PARQUET_251_FIXED_END(1, 5, 0);
bool CorruptStatistics::should_ignore_statistics(const std::string& created_by,
tparquet::Type::type physical_type) {
if (physical_type != tparquet::Type::BYTE_ARRAY &&
physical_type != tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
// The bug only applies to binary columns
return false;
}
if (created_by.empty()) {
// created_by is not populated
VLOG_DEBUG
<< "Ignoring statistics because created_by is null or empty! See PARQUET-251 and "
"PARQUET-297";
return true;
}
Status status;
std::unique_ptr<ParsedVersion> parsed_version;
status = VersionParser::parse(created_by, &parsed_version);
if (!status.ok()) {
VLOG_DEBUG << "Ignoring statistics because created_by could not be parsed (see "
"PARQUET-251)."
" CreatedBy: "
<< created_by << ", msg: " << status.msg();
return true;
}
if (parsed_version->application() != "parquet-mr") {
// Assume other applications don't have this bug
return false;
}
if ((!parsed_version->version().has_value()) || parsed_version->version().value().empty()) {
VLOG_DEBUG << "Ignoring statistics because created_by did not contain a semver (see "
"PARQUET-251): "
<< created_by;
return true;
}
std::unique_ptr<SemanticVersion> semantic_version;
status = SemanticVersion::parse(parsed_version->version().value(), &semantic_version);
if (!status.ok()) {
VLOG_DEBUG << "Ignoring statistics because created_by could not be parsed (see "
"PARQUET-251)."
" CreatedBy: "
<< created_by << ", msg: " << status.msg();
return true;
}
if (semantic_version->compare_to(PARQUET_251_FIXED_VERSION) < 0 &&
!(semantic_version->compare_to(CDH_5_PARQUET_251_FIXED_START) >= 0 &&
semantic_version->compare_to(CDH_5_PARQUET_251_FIXED_END) < 0)) {
VLOG_DEBUG
<< "Ignoring statistics because this file was created prior to the fixed version, "
"see PARQUET-251";
return true;
}
// This file was created after the fix
return false;
}
#include "common/compile_check_end.h"
} // namespace doris::vectorized