blob: 8a43818a535bc8936710f17a5fa48c8bf284fc79 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "Reader.hh"
#include "Adaptor.hh"
#include "BloomFilter.hh"
#include "Options.hh"
#include "Statistics.hh"
#include "StripeStream.hh"
#include "Utils.hh"
#include "wrap/coded-stream-wrapper.h"
#include <algorithm>
#include <iostream>
#include <iterator>
#include <memory>
#include <set>
#include <sstream>
#include <string>
#include <vector>
namespace orc {
// ORC files writen by these versions of cpp writers have inconsistent bloom filter
// hashing. Bloom filters of them should not be used.
static const char* BAD_CPP_BLOOM_FILTER_VERSIONS[] = {
"1.6.0", "1.6.1", "1.6.2", "1.6.3", "1.6.4", "1.6.5", "1.6.6",
"1.6.7", "1.6.8", "1.6.9", "1.6.10", "1.6.11", "1.7.0"};
ReaderMetrics* getDefaultReaderMetrics() {
static ReaderMetrics internal;
return &internal;
}
const RowReaderOptions::IdReadIntentMap EMPTY_IDREADINTENTMAP() {
return {};
}
const WriterVersionImpl& WriterVersionImpl::VERSION_HIVE_8732() {
static const WriterVersionImpl version(WriterVersion_HIVE_8732);
return version;
}
uint64_t getCompressionBlockSize(const proto::PostScript& ps) {
if (ps.has_compression_block_size()) {
return ps.compression_block_size();
} else {
return 256 * 1024;
}
}
CompressionKind convertCompressionKind(const proto::PostScript& ps) {
if (ps.has_compression()) {
return static_cast<CompressionKind>(ps.compression());
} else {
throw ParseError("Unknown compression type");
}
}
std::string ColumnSelector::toDotColumnPath() {
if (columns_.empty()) {
return std::string();
}
std::ostringstream columnStream;
std::copy(columns_.begin(), columns_.end(),
std::ostream_iterator<std::string>(columnStream, "."));
std::string columnPath = columnStream.str();
return columnPath.substr(0, columnPath.length() - 1);
}
WriterVersion getWriterVersionImpl(const FileContents* contents) {
if (!contents->postscript->has_writer_version()) {
return WriterVersion_ORIGINAL;
}
return static_cast<WriterVersion>(contents->postscript->writer_version());
}
void ColumnSelector::selectChildren(std::vector<bool>& selectedColumns, const Type& type) {
return selectChildren(selectedColumns, type, EMPTY_IDREADINTENTMAP());
}
void ColumnSelector::selectChildren(std::vector<bool>& selectedColumns, const Type& type,
const RowReaderOptions::IdReadIntentMap& idReadIntentMap) {
size_t id = static_cast<size_t>(type.getColumnId());
TypeKind kind = type.getKind();
if (!selectedColumns[id]) {
selectedColumns[id] = true;
bool selectChild = true;
if (kind == TypeKind::LIST || kind == TypeKind::MAP || kind == TypeKind::UNION) {
auto elem = idReadIntentMap.find(id);
if (elem != idReadIntentMap.end() && elem->second == ReadIntent_OFFSETS) {
selectChild = false;
}
}
if (selectChild) {
for (size_t c = id; c <= type.getMaximumColumnId(); ++c) {
selectedColumns[c] = true;
}
}
}
}
/**
* Recurses over a type tree and selects the parents of every selected type.
* @return true if any child was selected.
*/
bool ColumnSelector::selectParents(std::vector<bool>& selectedColumns, const Type& type) {
size_t id = static_cast<size_t>(type.getColumnId());
bool result = selectedColumns[id];
uint64_t numSubtypeSelected = 0;
for (uint64_t c = 0; c < type.getSubtypeCount(); ++c) {
if (selectParents(selectedColumns, *type.getSubtype(c))) {
result = true;
numSubtypeSelected++;
}
}
selectedColumns[id] = result;
if (type.getKind() == TypeKind::UNION && selectedColumns[id]) {
if (0 < numSubtypeSelected && numSubtypeSelected < type.getSubtypeCount()) {
// Subtypes of UNION should be fully selected or not selected at all.
// Override partial subtype selections with full selections.
for (uint64_t c = 0; c < type.getSubtypeCount(); ++c) {
selectChildren(selectedColumns, *type.getSubtype(c));
}
}
}
return result;
}
/**
* Recurses over a type tree and build two maps
* map<TypeName, TypeId>, map<TypeId, Type>
*/
void ColumnSelector::buildTypeNameIdMap(const Type* type) {
// map<type_id, Type*>
idTypeMap_[type->getColumnId()] = type;
if (STRUCT == type->getKind()) {
for (size_t i = 0; i < type->getSubtypeCount(); ++i) {
const std::string& fieldName = type->getFieldName(i);
columns_.push_back(fieldName);
nameIdMap_[toDotColumnPath()] = type->getSubtype(i)->getColumnId();
buildTypeNameIdMap(type->getSubtype(i));
columns_.pop_back();
}
} else {
// other non-primitive type
for (size_t j = 0; j < type->getSubtypeCount(); ++j) {
buildTypeNameIdMap(type->getSubtype(j));
}
}
}
void ColumnSelector::updateSelected(std::vector<bool>& selectedColumns,
const RowReaderOptions& options) {
selectedColumns.assign(static_cast<size_t>(contents_->footer->types_size()), false);
if (contents_->schema->getKind() == STRUCT && options.getIndexesSet()) {
for (std::list<uint64_t>::const_iterator field = options.getInclude().begin();
field != options.getInclude().end(); ++field) {
updateSelectedByFieldId(selectedColumns, *field);
}
} else if (contents_->schema->getKind() == STRUCT && options.getNamesSet()) {
for (std::list<std::string>::const_iterator field = options.getIncludeNames().begin();
field != options.getIncludeNames().end(); ++field) {
updateSelectedByName(selectedColumns, *field);
}
} else if (options.getTypeIdsSet()) {
const RowReaderOptions::IdReadIntentMap idReadIntentMap = options.getIdReadIntentMap();
for (std::list<uint64_t>::const_iterator typeId = options.getInclude().begin();
typeId != options.getInclude().end(); ++typeId) {
updateSelectedByTypeId(selectedColumns, *typeId, idReadIntentMap);
}
} else {
// default is to select all columns
std::fill(selectedColumns.begin(), selectedColumns.end(), true);
}
selectParents(selectedColumns, *contents_->schema.get());
selectedColumns[0] = true; // column 0 is selected by default
}
void ColumnSelector::updateSelectedByFieldId(std::vector<bool>& selectedColumns,
uint64_t fieldId) {
if (fieldId < contents_->schema->getSubtypeCount()) {
selectChildren(selectedColumns, *contents_->schema->getSubtype(fieldId));
} else {
std::stringstream buffer;
buffer << "Invalid column selected " << fieldId << " out of "
<< contents_->schema->getSubtypeCount();
throw ParseError(buffer.str());
}
}
void ColumnSelector::updateSelectedByTypeId(std::vector<bool>& selectedColumns, uint64_t typeId) {
updateSelectedByTypeId(selectedColumns, typeId, EMPTY_IDREADINTENTMAP());
}
void ColumnSelector::updateSelectedByTypeId(
std::vector<bool>& selectedColumns, uint64_t typeId,
const RowReaderOptions::IdReadIntentMap& idReadIntentMap) {
if (typeId < selectedColumns.size()) {
const Type& type = *idTypeMap_[typeId];
selectChildren(selectedColumns, type, idReadIntentMap);
} else {
std::stringstream buffer;
buffer << "Invalid type id selected " << typeId << " out of " << selectedColumns.size();
throw ParseError(buffer.str());
}
}
void ColumnSelector::updateSelectedByName(std::vector<bool>& selectedColumns,
const std::string& fieldName) {
std::map<std::string, uint64_t>::const_iterator ite = nameIdMap_.find(fieldName);
if (ite != nameIdMap_.end()) {
updateSelectedByTypeId(selectedColumns, ite->second);
} else {
bool first = true;
std::ostringstream ss;
ss << "Invalid column selected " << fieldName << ". Valid names are ";
for (auto it = nameIdMap_.begin(); it != nameIdMap_.end(); ++it) {
if (!first) ss << ", ";
ss << it->first;
first = false;
}
throw ParseError(ss.str());
}
}
ColumnSelector::ColumnSelector(const FileContents* contents) : contents_(contents) {
buildTypeNameIdMap(contents_->schema.get());
}
RowReaderImpl::RowReaderImpl(std::shared_ptr<FileContents> contents, const RowReaderOptions& opts)
: localTimezone_(getLocalTimezone()),
contents_(contents),
throwOnHive11DecimalOverflow_(opts.getThrowOnHive11DecimalOverflow()),
forcedScaleOnHive11Decimal_(opts.getForcedScaleOnHive11Decimal()),
footer_(contents_->footer.get()),
firstRowOfStripe_(*contents_->pool, 0),
enableEncodedBlock_(opts.getEnableLazyDecoding()),
readerTimezone_(getTimezoneByName(opts.getTimezoneName())),
schemaEvolution_(opts.getReadType(), contents_->schema.get()) {
uint64_t numberOfStripes;
numberOfStripes = static_cast<uint64_t>(footer_->stripes_size());
currentStripe_ = numberOfStripes;
lastStripe_ = 0;
currentRowInStripe_ = 0;
rowsInCurrentStripe_ = 0;
numRowGroupsInStripeRange_ = 0;
useTightNumericVector_ = opts.getUseTightNumericVector();
throwOnSchemaEvolutionOverflow_ = opts.getThrowOnSchemaEvolutionOverflow();
uint64_t rowTotal = 0;
firstRowOfStripe_.resize(numberOfStripes);
for (size_t i = 0; i < numberOfStripes; ++i) {
firstRowOfStripe_[i] = rowTotal;
proto::StripeInformation stripeInfo = footer_->stripes(static_cast<int>(i));
rowTotal += stripeInfo.number_of_rows();
bool isStripeInRange = stripeInfo.offset() >= opts.getOffset() &&
stripeInfo.offset() < opts.getOffset() + opts.getLength();
if (isStripeInRange) {
if (i < currentStripe_) {
currentStripe_ = i;
}
if (i >= lastStripe_) {
lastStripe_ = i + 1;
}
if (footer_->row_index_stride() > 0) {
numRowGroupsInStripeRange_ +=
(stripeInfo.number_of_rows() + footer_->row_index_stride() - 1) /
footer_->row_index_stride();
}
}
}
firstStripe_ = currentStripe_;
processingStripe_ = lastStripe_;
if (currentStripe_ == 0) {
previousRow_ = (std::numeric_limits<uint64_t>::max)();
} else if (currentStripe_ == numberOfStripes) {
previousRow_ = footer_->number_of_rows();
} else {
previousRow_ = firstRowOfStripe_[firstStripe_] - 1;
}
ColumnSelector column_selector(contents_.get());
column_selector.updateSelected(selectedColumns_, opts);
// prepare SargsApplier if SearchArgument is available
if (opts.getSearchArgument() && footer_->row_index_stride() > 0) {
sargs_ = opts.getSearchArgument();
sargsApplier_.reset(
new SargsApplier(*contents_->schema, sargs_.get(), footer_->row_index_stride(),
getWriterVersionImpl(contents.get()), contents_->readerMetrics));
}
skipBloomFilters_ = hasBadBloomFilters();
}
// Check if the file has inconsistent bloom filters.
bool RowReaderImpl::hasBadBloomFilters() {
// Only C++ writer in old releases could have bad bloom filters.
if (footer_->writer() != ORC_CPP_WRITER) return false;
// 'softwareVersion' is added in 1.5.13, 1.6.11, and 1.7.0.
// 1.6.x releases before 1.6.11 won't have it. On the other side, the C++ writer
// supports writing bloom filters since 1.6.0. So files written by the C++ writer
// and with 'softwareVersion' unset would have bad bloom filters.
if (!footer_->has_software_version()) return true;
const std::string& fullVersion = footer_->software_version();
std::string version;
// Deal with snapshot versions, e.g. 1.6.12-SNAPSHOT.
if (fullVersion.find('-') != std::string::npos) {
version = fullVersion.substr(0, fullVersion.find('-'));
} else {
version = fullVersion;
}
for (const char* v : BAD_CPP_BLOOM_FILTER_VERSIONS) {
if (version == v) {
return true;
}
}
return false;
}
CompressionKind RowReaderImpl::getCompression() const {
return contents_->compression;
}
uint64_t RowReaderImpl::getCompressionSize() const {
return contents_->blockSize;
}
const std::vector<bool> RowReaderImpl::getSelectedColumns() const {
return selectedColumns_;
}
const Type& RowReaderImpl::getSelectedType() const {
if (selectedSchema_.get() == nullptr) {
selectedSchema_ = buildSelectedType(contents_->schema.get(), selectedColumns_);
}
return *(selectedSchema_.get());
}
uint64_t RowReaderImpl::getRowNumber() const {
return previousRow_;
}
void RowReaderImpl::seekToRow(uint64_t rowNumber) {
// Empty file
if (lastStripe_ == 0) {
return;
}
// If we are reading only a portion of the file
// (bounded by firstStripe and lastStripe),
// seeking before or after the portion of interest should return no data.
// Implement this by setting previousRow to the number of rows in the file.
// seeking past lastStripe
uint64_t num_stripes = static_cast<uint64_t>(footer_->stripes_size());
if ((lastStripe_ == num_stripes && rowNumber >= footer_->number_of_rows()) ||
(lastStripe_ < num_stripes && rowNumber >= firstRowOfStripe_[lastStripe_])) {
currentStripe_ = num_stripes;
previousRow_ = footer_->number_of_rows();
return;
}
uint64_t seekToStripe = 0;
while (seekToStripe + 1 < lastStripe_ && firstRowOfStripe_[seekToStripe + 1] <= rowNumber) {
seekToStripe++;
}
// seeking before the first stripe
if (seekToStripe < firstStripe_) {
currentStripe_ = num_stripes;
previousRow_ = footer_->number_of_rows();
return;
}
previousRow_ = rowNumber;
auto rowIndexStride = footer_->row_index_stride();
if (!isCurrentStripeInited() || currentStripe_ != seekToStripe || rowIndexStride == 0 ||
currentStripeInfo_.index_length() == 0) {
// current stripe is not initialized or
// target stripe is not current stripe or
// current stripe doesn't have row indexes
currentStripe_ = seekToStripe;
currentRowInStripe_ = rowNumber - firstRowOfStripe_[currentStripe_];
startNextStripe();
if (currentStripe_ >= lastStripe_) {
return;
}
} else {
currentRowInStripe_ = rowNumber - firstRowOfStripe_[currentStripe_];
if (sargsApplier_) {
// advance to selected row group if predicate pushdown is enabled
currentRowInStripe_ =
advanceToNextRowGroup(currentRowInStripe_, rowsInCurrentStripe_,
footer_->row_index_stride(), sargsApplier_->getNextSkippedRows());
}
}
uint64_t rowsToSkip = currentRowInStripe_;
// seek to the target row group if row indexes exists
if (rowIndexStride > 0 && currentStripeInfo_.index_length() > 0) {
if (rowIndexes_.empty()) {
loadStripeIndex();
}
// TODO(ORC-1175): process the failures of loadStripeIndex() call
seekToRowGroup(static_cast<uint32_t>(rowsToSkip / rowIndexStride));
// skip leading rows in the target row group
rowsToSkip %= rowIndexStride;
}
// 'reader' is reset in startNextStripe(). It could be nullptr if 'rowsToSkip' is 0,
// e.g. when startNextStripe() skips all remaining rows of the file.
if (rowsToSkip > 0) {
reader_->skip(rowsToSkip);
}
}
void RowReaderImpl::loadStripeIndex() {
// reset all previous row indexes
rowIndexes_.clear();
bloomFilterIndex_.clear();
// obtain row indexes for selected columns
uint64_t offset = currentStripeInfo_.offset();
for (int i = 0; i < currentStripeFooter_.streams_size(); ++i) {
const proto::Stream& pbStream = currentStripeFooter_.streams(i);
uint64_t colId = pbStream.column();
if (selectedColumns_[colId] && pbStream.has_kind() &&
(pbStream.kind() == proto::Stream_Kind_ROW_INDEX ||
pbStream.kind() == proto::Stream_Kind_BLOOM_FILTER_UTF8)) {
std::unique_ptr<SeekableInputStream> inStream = createDecompressor(
getCompression(),
std::unique_ptr<SeekableInputStream>(new SeekableFileInputStream(
contents_->stream.get(), offset, pbStream.length(), *contents_->pool)),
getCompressionSize(), *contents_->pool, contents_->readerMetrics);
if (pbStream.kind() == proto::Stream_Kind_ROW_INDEX) {
proto::RowIndex rowIndex;
if (!rowIndex.ParseFromZeroCopyStream(inStream.get())) {
throw ParseError("Failed to parse the row index");
}
rowIndexes_[colId] = rowIndex;
} else if (!skipBloomFilters_) { // Stream_Kind_BLOOM_FILTER_UTF8
proto::BloomFilterIndex pbBFIndex;
if (!pbBFIndex.ParseFromZeroCopyStream(inStream.get())) {
throw ParseError("Failed to parse bloom filter index");
}
BloomFilterIndex bfIndex;
for (int j = 0; j < pbBFIndex.bloom_filter_size(); j++) {
bfIndex.entries.push_back(BloomFilterUTF8Utils::deserialize(
pbStream.kind(), currentStripeFooter_.columns(static_cast<int>(pbStream.column())),
pbBFIndex.bloom_filter(j)));
}
// add bloom filters to result for one column
bloomFilterIndex_[pbStream.column()] = bfIndex;
}
}
offset += pbStream.length();
}
}
void RowReaderImpl::seekToRowGroup(uint32_t rowGroupEntryId) {
// store positions for selected columns
std::list<std::list<uint64_t>> positions;
// store position providers for selected colimns
std::unordered_map<uint64_t, PositionProvider> positionProviders;
for (auto rowIndex = rowIndexes_.cbegin(); rowIndex != rowIndexes_.cend(); ++rowIndex) {
uint64_t colId = rowIndex->first;
const proto::RowIndexEntry& entry =
rowIndex->second.entry(static_cast<int32_t>(rowGroupEntryId));
// copy index positions for a specific column
positions.push_back({});
auto& position = positions.back();
for (int pos = 0; pos != entry.positions_size(); ++pos) {
position.push_back(entry.positions(pos));
}
positionProviders.insert(std::make_pair(colId, PositionProvider(position)));
}
reader_->seekToRowGroup(positionProviders);
}
const FileContents& RowReaderImpl::getFileContents() const {
return *contents_;
}
bool RowReaderImpl::getThrowOnHive11DecimalOverflow() const {
return throwOnHive11DecimalOverflow_;
}
bool RowReaderImpl::getIsDecimalAsLong() const {
return contents_->isDecimalAsLong;
}
int32_t RowReaderImpl::getForcedScaleOnHive11Decimal() const {
return forcedScaleOnHive11Decimal_;
}
proto::StripeFooter getStripeFooter(const proto::StripeInformation& info,
const FileContents& contents) {
uint64_t stripeFooterStart = info.offset() + info.index_length() + info.data_length();
uint64_t stripeFooterLength = info.footer_length();
std::unique_ptr<SeekableInputStream> pbStream = createDecompressor(
contents.compression,
std::make_unique<SeekableFileInputStream>(contents.stream.get(), stripeFooterStart,
stripeFooterLength, *contents.pool),
contents.blockSize, *contents.pool, contents.readerMetrics);
proto::StripeFooter result;
if (!result.ParseFromZeroCopyStream(pbStream.get())) {
throw ParseError(std::string("bad StripeFooter from ") + pbStream->getName());
}
// Verify StripeFooter in case it's corrupt
if (result.columns_size() != contents.footer->types_size()) {
std::stringstream msg;
msg << "bad number of ColumnEncodings in StripeFooter: expected="
<< contents.footer->types_size() << ", actual=" << result.columns_size();
throw ParseError(msg.str());
}
return result;
}
ReaderImpl::ReaderImpl(std::shared_ptr<FileContents> contents, const ReaderOptions& opts,
uint64_t fileLength, uint64_t postscriptLength)
: contents_(std::move(contents)),
options_(opts),
fileLength_(fileLength),
postscriptLength_(postscriptLength),
footer_(contents_->footer.get()) {
isMetadataLoaded_ = false;
checkOrcVersion();
numberOfStripes_ = static_cast<uint64_t>(footer_->stripes_size());
contents_->schema = convertType(footer_->types(0), *footer_);
contents_->blockSize = getCompressionBlockSize(*contents_->postscript);
contents_->compression = convertCompressionKind(*contents_->postscript);
}
std::string ReaderImpl::getSerializedFileTail() const {
proto::FileTail tail;
proto::PostScript* mutable_ps = tail.mutable_postscript();
mutable_ps->CopyFrom(*contents_->postscript);
proto::Footer* mutableFooter = tail.mutable_footer();
mutableFooter->CopyFrom(*footer_);
tail.set_file_length(fileLength_);
tail.set_postscript_length(postscriptLength_);
std::string result;
if (!tail.SerializeToString(&result)) {
throw ParseError("Failed to serialize file tail");
}
return result;
}
const ReaderOptions& ReaderImpl::getReaderOptions() const {
return options_;
}
CompressionKind ReaderImpl::getCompression() const {
return contents_->compression;
}
uint64_t ReaderImpl::getCompressionSize() const {
return contents_->blockSize;
}
uint64_t ReaderImpl::getNumberOfStripes() const {
return numberOfStripes_;
}
uint64_t ReaderImpl::getNumberOfStripeStatistics() const {
if (!isMetadataLoaded_) {
readMetadata();
}
return contents_->metadata == nullptr
? 0
: static_cast<uint64_t>(contents_->metadata->stripe_stats_size());
}
std::unique_ptr<StripeInformation> ReaderImpl::getStripe(uint64_t stripeIndex) const {
if (stripeIndex > getNumberOfStripes()) {
throw std::logic_error("stripe index out of range");
}
proto::StripeInformation stripeInfo = footer_->stripes(static_cast<int>(stripeIndex));
return std::unique_ptr<StripeInformation>(new StripeInformationImpl(
stripeInfo.offset(), stripeInfo.index_length(), stripeInfo.data_length(),
stripeInfo.footer_length(), stripeInfo.number_of_rows(), contents_->stream.get(),
*contents_->pool, contents_->compression, contents_->blockSize, contents_->readerMetrics));
}
FileVersion ReaderImpl::getFormatVersion() const {
if (contents_->postscript->version_size() != 2) {
return FileVersion::v_0_11();
}
return {contents_->postscript->version(0), contents_->postscript->version(1)};
}
uint64_t ReaderImpl::getNumberOfRows() const {
return footer_->number_of_rows();
}
WriterId ReaderImpl::getWriterId() const {
if (footer_->has_writer()) {
uint32_t id = footer_->writer();
if (id > WriterId::CUDF_WRITER) {
return WriterId::UNKNOWN_WRITER;
} else {
return static_cast<WriterId>(id);
}
}
return WriterId::ORC_JAVA_WRITER;
}
uint32_t ReaderImpl::getWriterIdValue() const {
if (footer_->has_writer()) {
return footer_->writer();
} else {
return WriterId::ORC_JAVA_WRITER;
}
}
std::string ReaderImpl::getSoftwareVersion() const {
std::ostringstream buffer;
buffer << writerIdToString(getWriterIdValue());
if (footer_->has_software_version()) {
buffer << " " << footer_->software_version();
}
return buffer.str();
}
WriterVersion ReaderImpl::getWriterVersion() const {
return getWriterVersionImpl(contents_.get());
}
uint64_t ReaderImpl::getContentLength() const {
return footer_->content_length();
}
uint64_t ReaderImpl::getStripeStatisticsLength() const {
return contents_->postscript->metadata_length();
}
uint64_t ReaderImpl::getFileFooterLength() const {
return contents_->postscript->footer_length();
}
uint64_t ReaderImpl::getFilePostscriptLength() const {
return postscriptLength_;
}
uint64_t ReaderImpl::getFileLength() const {
return fileLength_;
}
uint64_t ReaderImpl::getRowIndexStride() const {
return footer_->row_index_stride();
}
const std::string& ReaderImpl::getStreamName() const {
return contents_->stream->getName();
}
std::list<std::string> ReaderImpl::getMetadataKeys() const {
std::list<std::string> result;
for (int i = 0; i < footer_->metadata_size(); ++i) {
result.push_back(footer_->metadata(i).name());
}
return result;
}
std::string ReaderImpl::getMetadataValue(const std::string& key) const {
for (int i = 0; i < footer_->metadata_size(); ++i) {
if (footer_->metadata(i).name() == key) {
return footer_->metadata(i).value();
}
}
throw std::range_error("key not found");
}
void ReaderImpl::getRowIndexStatistics(
const proto::StripeInformation& stripeInfo, uint64_t stripeIndex,
const proto::StripeFooter& currentStripeFooter,
std::vector<std::vector<proto::ColumnStatistics>>* indexStats) const {
int num_streams = currentStripeFooter.streams_size();
uint64_t offset = stripeInfo.offset();
uint64_t indexEnd = stripeInfo.offset() + stripeInfo.index_length();
for (int i = 0; i < num_streams; i++) {
const proto::Stream& stream = currentStripeFooter.streams(i);
StreamKind streamKind = static_cast<StreamKind>(stream.kind());
uint64_t length = static_cast<uint64_t>(stream.length());
if (streamKind == StreamKind::StreamKind_ROW_INDEX) {
if (offset + length > indexEnd) {
std::stringstream msg;
msg << "Malformed RowIndex stream meta in stripe " << stripeIndex
<< ": streamOffset=" << offset << ", streamLength=" << length
<< ", stripeOffset=" << stripeInfo.offset()
<< ", stripeIndexLength=" << stripeInfo.index_length();
throw ParseError(msg.str());
}
std::unique_ptr<SeekableInputStream> pbStream =
createDecompressor(contents_->compression,
std::unique_ptr<SeekableInputStream>(new SeekableFileInputStream(
contents_->stream.get(), offset, length, *contents_->pool)),
contents_->blockSize, *(contents_->pool), contents_->readerMetrics);
proto::RowIndex rowIndex;
if (!rowIndex.ParseFromZeroCopyStream(pbStream.get())) {
throw ParseError("Failed to parse RowIndex from stripe footer");
}
int num_entries = rowIndex.entry_size();
size_t column = static_cast<size_t>(stream.column());
for (int j = 0; j < num_entries; j++) {
const proto::RowIndexEntry& entry = rowIndex.entry(j);
(*indexStats)[column].push_back(entry.statistics());
}
}
offset += length;
}
}
bool ReaderImpl::hasMetadataValue(const std::string& key) const {
for (int i = 0; i < footer_->metadata_size(); ++i) {
if (footer_->metadata(i).name() == key) {
return true;
}
}
return false;
}
const Type& ReaderImpl::getType() const {
return *(contents_->schema.get());
}
std::unique_ptr<StripeStatistics> ReaderImpl::getStripeStatistics(uint64_t stripeIndex) const {
if (!isMetadataLoaded_) {
readMetadata();
}
if (contents_->metadata == nullptr) {
throw std::logic_error("No stripe statistics in file");
}
size_t num_cols = static_cast<size_t>(
contents_->metadata->stripe_stats(static_cast<int>(stripeIndex)).col_stats_size());
std::vector<std::vector<proto::ColumnStatistics>> indexStats(num_cols);
proto::StripeInformation currentStripeInfo = footer_->stripes(static_cast<int>(stripeIndex));
proto::StripeFooter currentStripeFooter = getStripeFooter(currentStripeInfo, *contents_.get());
getRowIndexStatistics(currentStripeInfo, stripeIndex, currentStripeFooter, &indexStats);
const Timezone& writerTZ = currentStripeFooter.has_writer_timezone()
? getTimezoneByName(currentStripeFooter.writer_timezone())
: getLocalTimezone();
StatContext statContext(hasCorrectStatistics(), &writerTZ);
return std::make_unique<StripeStatisticsImpl>(
contents_->metadata->stripe_stats(static_cast<int>(stripeIndex)), indexStats, statContext);
}
std::unique_ptr<Statistics> ReaderImpl::getStatistics() const {
StatContext statContext(hasCorrectStatistics());
return std::make_unique<StatisticsImpl>(*footer_, statContext);
}
std::unique_ptr<ColumnStatistics> ReaderImpl::getColumnStatistics(uint32_t index) const {
if (index >= static_cast<uint64_t>(footer_->statistics_size())) {
throw std::logic_error("column index out of range");
}
proto::ColumnStatistics col = footer_->statistics(static_cast<int32_t>(index));
StatContext statContext(hasCorrectStatistics());
return std::unique_ptr<ColumnStatistics>(convertColumnStatistics(col, statContext));
}
void ReaderImpl::readMetadata() const {
uint64_t metadataSize = contents_->postscript->metadata_length();
uint64_t footerLength = contents_->postscript->footer_length();
if (fileLength_ < metadataSize + footerLength + postscriptLength_ + 1) {
std::stringstream msg;
msg << "Invalid Metadata length: fileLength=" << fileLength_
<< ", metadataLength=" << metadataSize << ", footerLength=" << footerLength
<< ", postscriptLength=" << postscriptLength_;
throw ParseError(msg.str());
}
uint64_t metadataStart = fileLength_ - metadataSize - footerLength - postscriptLength_ - 1;
if (metadataSize != 0) {
std::unique_ptr<SeekableInputStream> pbStream = createDecompressor(
contents_->compression,
std::make_unique<SeekableFileInputStream>(contents_->stream.get(), metadataStart,
metadataSize, *contents_->pool),
contents_->blockSize, *contents_->pool, contents_->readerMetrics);
contents_->metadata.reset(new proto::Metadata());
if (!contents_->metadata->ParseFromZeroCopyStream(pbStream.get())) {
throw ParseError("Failed to parse the metadata");
}
}
isMetadataLoaded_ = true;
}
bool ReaderImpl::hasCorrectStatistics() const {
return !WriterVersionImpl::VERSION_HIVE_8732().compareGT(getWriterVersion());
}
void ReaderImpl::checkOrcVersion() {
FileVersion version = getFormatVersion();
if (version != FileVersion(0, 11) && version != FileVersion(0, 12)) {
*(options_.getErrorStream())
<< "Warning: ORC file " << contents_->stream->getName()
<< " was written in an unknown format version " << version.toString() << "\n";
}
}
std::unique_ptr<RowReader> ReaderImpl::createRowReader() const {
RowReaderOptions defaultOpts;
return createRowReader(defaultOpts);
}
std::unique_ptr<RowReader> ReaderImpl::createRowReader(const RowReaderOptions& opts) const {
if (opts.getSearchArgument() && !isMetadataLoaded_) {
// load stripe statistics for PPD
readMetadata();
}
return std::make_unique<RowReaderImpl>(contents_, opts);
}
uint64_t maxStreamsForType(const proto::Type& type) {
switch (static_cast<int64_t>(type.kind())) {
case proto::Type_Kind_STRUCT:
return 1;
case proto::Type_Kind_INT:
case proto::Type_Kind_LONG:
case proto::Type_Kind_SHORT:
case proto::Type_Kind_FLOAT:
case proto::Type_Kind_DOUBLE:
case proto::Type_Kind_BOOLEAN:
case proto::Type_Kind_BYTE:
case proto::Type_Kind_DATE:
case proto::Type_Kind_LIST:
case proto::Type_Kind_MAP:
case proto::Type_Kind_UNION:
return 2;
case proto::Type_Kind_BINARY:
case proto::Type_Kind_DECIMAL:
case proto::Type_Kind_TIMESTAMP:
case proto::Type_Kind_TIMESTAMP_INSTANT:
return 3;
case proto::Type_Kind_CHAR:
case proto::Type_Kind_STRING:
case proto::Type_Kind_VARCHAR:
return 4;
default:
return 0;
}
}
uint64_t ReaderImpl::getMemoryUse(int stripeIx) {
std::vector<bool> selectedColumns;
selectedColumns.assign(static_cast<size_t>(contents_->footer->types_size()), true);
return getMemoryUse(stripeIx, selectedColumns);
}
uint64_t ReaderImpl::getMemoryUseByFieldId(const std::list<uint64_t>& include, int stripeIx) {
std::vector<bool> selectedColumns;
selectedColumns.assign(static_cast<size_t>(contents_->footer->types_size()), false);
ColumnSelector column_selector(contents_.get());
if (contents_->schema->getKind() == STRUCT && include.begin() != include.end()) {
for (std::list<uint64_t>::const_iterator field = include.begin(); field != include.end();
++field) {
column_selector.updateSelectedByFieldId(selectedColumns, *field);
}
} else {
// default is to select all columns
std::fill(selectedColumns.begin(), selectedColumns.end(), true);
}
column_selector.selectParents(selectedColumns, *contents_->schema.get());
selectedColumns[0] = true; // column 0 is selected by default
return getMemoryUse(stripeIx, selectedColumns);
}
uint64_t ReaderImpl::getMemoryUseByName(const std::list<std::string>& names, int stripeIx) {
std::vector<bool> selectedColumns;
selectedColumns.assign(static_cast<size_t>(contents_->footer->types_size()), false);
ColumnSelector column_selector(contents_.get());
if (contents_->schema->getKind() == STRUCT && names.begin() != names.end()) {
for (std::list<std::string>::const_iterator field = names.begin(); field != names.end();
++field) {
column_selector.updateSelectedByName(selectedColumns, *field);
}
} else {
// default is to select all columns
std::fill(selectedColumns.begin(), selectedColumns.end(), true);
}
column_selector.selectParents(selectedColumns, *contents_->schema.get());
selectedColumns[0] = true; // column 0 is selected by default
return getMemoryUse(stripeIx, selectedColumns);
}
uint64_t ReaderImpl::getMemoryUseByTypeId(const std::list<uint64_t>& include, int stripeIx) {
std::vector<bool> selectedColumns;
selectedColumns.assign(static_cast<size_t>(contents_->footer->types_size()), false);
ColumnSelector column_selector(contents_.get());
if (include.begin() != include.end()) {
for (std::list<uint64_t>::const_iterator field = include.begin(); field != include.end();
++field) {
column_selector.updateSelectedByTypeId(selectedColumns, *field);
}
} else {
// default is to select all columns
std::fill(selectedColumns.begin(), selectedColumns.end(), true);
}
column_selector.selectParents(selectedColumns, *contents_->schema.get());
selectedColumns[0] = true; // column 0 is selected by default
return getMemoryUse(stripeIx, selectedColumns);
}
uint64_t ReaderImpl::getMemoryUse(int stripeIx, std::vector<bool>& selectedColumns) {
uint64_t maxDataLength = 0;
if (stripeIx >= 0 && stripeIx < footer_->stripes_size()) {
uint64_t stripe = footer_->stripes(stripeIx).data_length();
if (maxDataLength < stripe) {
maxDataLength = stripe;
}
} else {
for (int i = 0; i < footer_->stripes_size(); i++) {
uint64_t stripe = footer_->stripes(i).data_length();
if (maxDataLength < stripe) {
maxDataLength = stripe;
}
}
}
bool hasStringColumn = false;
uint64_t nSelectedStreams = 0;
for (int i = 0; !hasStringColumn && i < footer_->types_size(); i++) {
if (selectedColumns[static_cast<size_t>(i)]) {
const proto::Type& type = footer_->types(i);
nSelectedStreams += maxStreamsForType(type);
switch (static_cast<int64_t>(type.kind())) {
case proto::Type_Kind_CHAR:
case proto::Type_Kind_STRING:
case proto::Type_Kind_VARCHAR:
case proto::Type_Kind_BINARY: {
hasStringColumn = true;
break;
}
default: {
break;
}
}
}
}
/* If a string column is read, use stripe data_length as a memory estimate
* because we don't know the dictionary size. Multiply by 2 because
* a string column requires two buffers:
* in the input stream and in the seekable input stream.
* If no string column is read, estimate from the number of streams.
*/
uint64_t memory = hasStringColumn
? 2 * maxDataLength
: std::min(uint64_t(maxDataLength),
nSelectedStreams * contents_->stream->getNaturalReadSize());
// Do we need even more memory to read the footer or the metadata?
if (memory < contents_->postscript->footer_length() + DIRECTORY_SIZE_GUESS) {
memory = contents_->postscript->footer_length() + DIRECTORY_SIZE_GUESS;
}
if (memory < contents_->postscript->metadata_length()) {
memory = contents_->postscript->metadata_length();
}
// Account for firstRowOfStripe.
memory += static_cast<uint64_t>(footer_->stripes_size()) * sizeof(uint64_t);
// Decompressors need buffers for each stream
uint64_t decompressorMemory = 0;
if (contents_->compression != CompressionKind_NONE) {
for (int i = 0; i < footer_->types_size(); i++) {
if (selectedColumns[static_cast<size_t>(i)]) {
const proto::Type& type = footer_->types(i);
decompressorMemory += maxStreamsForType(type) * contents_->blockSize;
}
}
if (contents_->compression == CompressionKind_SNAPPY) {
decompressorMemory *= 2; // Snappy decompressor uses a second buffer
}
}
return memory + decompressorMemory;
}
// Update fields to indicate we've reached the end of file
void RowReaderImpl::markEndOfFile() {
currentStripe_ = lastStripe_;
currentRowInStripe_ = 0;
rowsInCurrentStripe_ = 0;
if (lastStripe_ == 0) {
// Empty file
previousRow_ = 0;
} else {
previousRow_ = firstRowOfStripe_[lastStripe_ - 1] +
footer_->stripes(static_cast<int>(lastStripe_ - 1)).number_of_rows();
}
}
void RowReaderImpl::startNextStripe() {
reader_.reset(); // ColumnReaders use lots of memory; free old memory first
rowIndexes_.clear();
bloomFilterIndex_.clear();
// evaluate file statistics if it exists
if (sargsApplier_ &&
!sargsApplier_->evaluateFileStatistics(*footer_, numRowGroupsInStripeRange_)) {
// skip the entire file
markEndOfFile();
return;
}
do {
currentStripeInfo_ = footer_->stripes(static_cast<int>(currentStripe_));
uint64_t fileLength = contents_->stream->getLength();
if (currentStripeInfo_.offset() + currentStripeInfo_.index_length() +
currentStripeInfo_.data_length() + currentStripeInfo_.footer_length() >=
fileLength) {
std::stringstream msg;
msg << "Malformed StripeInformation at stripe index " << currentStripe_
<< ": fileLength=" << fileLength
<< ", StripeInfo=(offset=" << currentStripeInfo_.offset()
<< ", indexLength=" << currentStripeInfo_.index_length()
<< ", dataLength=" << currentStripeInfo_.data_length()
<< ", footerLength=" << currentStripeInfo_.footer_length() << ")";
throw ParseError(msg.str());
}
rowsInCurrentStripe_ = currentStripeInfo_.number_of_rows();
processingStripe_ = currentStripe_;
bool isStripeNeeded = true;
// If PPD enabled and stripe stats existed, evaulate it first
if (sargsApplier_ && contents_->metadata) {
const auto& currentStripeStats =
contents_->metadata->stripe_stats(static_cast<int>(currentStripe_));
// skip this stripe after stats fail to satisfy sargs
uint64_t stripeRowGroupCount =
(rowsInCurrentStripe_ + footer_->row_index_stride() - 1) / footer_->row_index_stride();
isStripeNeeded =
sargsApplier_->evaluateStripeStatistics(currentStripeStats, stripeRowGroupCount);
}
if (isStripeNeeded) {
currentStripeFooter_ = getStripeFooter(currentStripeInfo_, *contents_.get());
if (sargsApplier_) {
// read row group statistics and bloom filters of current stripe
loadStripeIndex();
// select row groups to read in the current stripe
sargsApplier_->pickRowGroups(rowsInCurrentStripe_, rowIndexes_, bloomFilterIndex_);
if (sargsApplier_->hasSelectedFrom(currentRowInStripe_)) {
// current stripe has at least one row group matching the predicate
break;
}
isStripeNeeded = false;
}
}
if (!isStripeNeeded) {
// advance to next stripe when current stripe has no matching rows
currentStripe_ += 1;
currentRowInStripe_ = 0;
}
} while (sargsApplier_ && currentStripe_ < lastStripe_);
if (currentStripe_ < lastStripe_) {
// get writer timezone info from stripe footer to help understand timestamp values.
const Timezone& writerTimezone =
currentStripeFooter_.has_writer_timezone()
? getTimezoneByName(currentStripeFooter_.writer_timezone())
: localTimezone_;
StripeStreamsImpl stripeStreams(*this, currentStripe_, currentStripeInfo_,
currentStripeFooter_, currentStripeInfo_.offset(),
*contents_->stream, writerTimezone, readerTimezone_);
reader_ = buildReader(*contents_->schema, stripeStreams, useTightNumericVector_,
throwOnSchemaEvolutionOverflow_, /*convertToReadType=*/true);
if (sargsApplier_) {
// move to the 1st selected row group when PPD is enabled.
currentRowInStripe_ =
advanceToNextRowGroup(currentRowInStripe_, rowsInCurrentStripe_,
footer_->row_index_stride(), sargsApplier_->getNextSkippedRows());
previousRow_ = firstRowOfStripe_[currentStripe_] + currentRowInStripe_ - 1;
if (currentRowInStripe_ > 0) {
seekToRowGroup(static_cast<uint32_t>(currentRowInStripe_ / footer_->row_index_stride()));
}
}
} else {
// All remaining stripes are skipped.
markEndOfFile();
}
}
bool RowReaderImpl::next(ColumnVectorBatch& data) {
SCOPED_STOPWATCH(contents->readerMetrics, ReaderInclusiveLatencyUs, ReaderCall);
if (currentStripe_ >= lastStripe_) {
data.numElements = 0;
markEndOfFile();
return false;
}
if (currentRowInStripe_ == 0) {
startNextStripe();
}
uint64_t rowsToRead =
std::min(static_cast<uint64_t>(data.capacity), rowsInCurrentStripe_ - currentRowInStripe_);
if (sargsApplier_ && rowsToRead > 0) {
rowsToRead =
computeBatchSize(rowsToRead, currentRowInStripe_, rowsInCurrentStripe_,
footer_->row_index_stride(), sargsApplier_->getNextSkippedRows());
}
data.numElements = rowsToRead;
if (rowsToRead == 0) {
markEndOfFile();
return false;
}
if (enableEncodedBlock_) {
reader_->nextEncoded(data, rowsToRead, nullptr);
} else {
reader_->next(data, rowsToRead, nullptr);
}
// update row number
previousRow_ = firstRowOfStripe_[currentStripe_] + currentRowInStripe_;
currentRowInStripe_ += rowsToRead;
// check if we need to advance to next selected row group
if (sargsApplier_) {
uint64_t nextRowToRead =
advanceToNextRowGroup(currentRowInStripe_, rowsInCurrentStripe_,
footer_->row_index_stride(), sargsApplier_->getNextSkippedRows());
if (currentRowInStripe_ != nextRowToRead) {
// it is guaranteed to be at start of a row group
currentRowInStripe_ = nextRowToRead;
if (currentRowInStripe_ < rowsInCurrentStripe_) {
seekToRowGroup(static_cast<uint32_t>(currentRowInStripe_ / footer_->row_index_stride()));
}
}
}
if (currentRowInStripe_ >= rowsInCurrentStripe_) {
currentStripe_ += 1;
currentRowInStripe_ = 0;
}
return rowsToRead != 0;
}
uint64_t RowReaderImpl::computeBatchSize(uint64_t requestedSize, uint64_t currentRowInStripe,
uint64_t rowsInCurrentStripe, uint64_t rowIndexStride,
const std::vector<uint64_t>& nextSkippedRows) {
// In case of PPD, batch size should be aware of row group boundaries. If only a subset of row
// groups are selected then marker position is set to the end of range (subset of row groups
// within stripe).
uint64_t endRowInStripe = rowsInCurrentStripe;
uint64_t groupsInStripe = nextSkippedRows.size();
if (groupsInStripe > 0) {
auto rg = static_cast<uint32_t>(currentRowInStripe / rowIndexStride);
if (rg >= groupsInStripe) return 0;
uint64_t nextSkippedRow = nextSkippedRows[rg];
if (nextSkippedRow == 0) return 0;
endRowInStripe = nextSkippedRow;
}
return std::min(requestedSize, endRowInStripe - currentRowInStripe);
}
uint64_t RowReaderImpl::advanceToNextRowGroup(uint64_t currentRowInStripe,
uint64_t rowsInCurrentStripe,
uint64_t rowIndexStride,
const std::vector<uint64_t>& nextSkippedRows) {
auto groupsInStripe = nextSkippedRows.size();
if (groupsInStripe == 0) {
// No PPD, keeps using the current row in stripe
return std::min(currentRowInStripe, rowsInCurrentStripe);
}
auto rg = static_cast<uint32_t>(currentRowInStripe / rowIndexStride);
if (rg >= groupsInStripe) {
// Points to the end of the stripe
return rowsInCurrentStripe;
}
if (nextSkippedRows[rg] != 0) {
// Current row group is selected
return currentRowInStripe;
}
// Advance to the next selected row group
while (rg < groupsInStripe && nextSkippedRows[rg] == 0) ++rg;
if (rg < groupsInStripe) {
return rg * rowIndexStride;
}
return rowsInCurrentStripe;
}
static void getColumnIds(const Type* type, std::set<uint64_t>& columnIds) {
columnIds.insert(type->getColumnId());
for (uint64_t i = 0; i < type->getSubtypeCount(); ++i) {
getColumnIds(type->getSubtype(i), columnIds);
}
}
std::unique_ptr<ColumnVectorBatch> RowReaderImpl::createRowBatch(uint64_t capacity) const {
// If the read type is specified, then check that the selected schema matches the read type
// on the first call to createRowBatch.
if (schemaEvolution_.getReadType() && selectedSchema_.get() == nullptr) {
auto fileSchema = &getSelectedType();
auto readType = schemaEvolution_.getReadType();
std::set<uint64_t> readColumns, fileColumns;
getColumnIds(readType, readColumns);
getColumnIds(fileSchema, fileColumns);
if (readColumns != fileColumns) {
std::ostringstream ss;
ss << "The selected schema " << fileSchema->toString() << " doesn't match read type "
<< readType->toString();
throw SchemaEvolutionError(ss.str());
}
}
const Type& readType =
schemaEvolution_.getReadType() ? *schemaEvolution_.getReadType() : getSelectedType();
return readType.createRowBatch(capacity, *contents_->pool, enableEncodedBlock_,
useTightNumericVector_);
}
void ensureOrcFooter(InputStream* stream, DataBuffer<char>* buffer, uint64_t postscriptLength) {
const std::string MAGIC("ORC");
const uint64_t magicLength = MAGIC.length();
const char* const bufferStart = buffer->data();
const uint64_t bufferLength = buffer->size();
if (postscriptLength < magicLength || bufferLength < magicLength) {
throw ParseError("Invalid ORC postscript length");
}
const char* magicStart = bufferStart + bufferLength - 1 - magicLength;
// Look for the magic string at the end of the postscript.
if (memcmp(magicStart, MAGIC.c_str(), magicLength) != 0) {
// If there is no magic string at the end, check the beginning.
// Only files written by Hive 0.11.0 don't have the tail ORC string.
std::unique_ptr<char[]> frontBuffer(new char[magicLength]);
stream->read(frontBuffer.get(), magicLength, 0);
bool foundMatch = memcmp(frontBuffer.get(), MAGIC.c_str(), magicLength) == 0;
if (!foundMatch) {
throw ParseError("Not an ORC file");
}
}
}
/**
* Read the file's postscript from the given buffer.
* @param stream the file stream
* @param buffer the buffer with the tail of the file.
* @param postscriptSize the length of postscript in bytes
*/
std::unique_ptr<proto::PostScript> readPostscript(InputStream* stream, DataBuffer<char>* buffer,
uint64_t postscriptSize) {
char* ptr = buffer->data();
uint64_t readSize = buffer->size();
ensureOrcFooter(stream, buffer, postscriptSize);
auto postscript = std::make_unique<proto::PostScript>();
if (readSize < 1 + postscriptSize) {
std::stringstream msg;
msg << "Invalid ORC postscript length: " << postscriptSize
<< ", file length = " << stream->getLength();
throw ParseError(msg.str());
}
if (!postscript->ParseFromArray(ptr + readSize - 1 - postscriptSize,
static_cast<int>(postscriptSize))) {
throw ParseError("Failed to parse the postscript from " + stream->getName());
}
return postscript;
}
/**
* Check that proto Types are valid. Indices in the type tree should be valid,
* so we won't crash when we convert the proto::Types to TypeImpls (ORC-317).
* For STRUCT types, fieldName size should match subTypes size (ORC-581).
*/
void checkProtoTypes(const proto::Footer& footer) {
std::stringstream msg;
int maxId = footer.types_size();
if (maxId <= 0) {
throw ParseError("Footer is corrupt: no types found");
}
for (int i = 0; i < maxId; ++i) {
const proto::Type& type = footer.types(i);
if (type.kind() == proto::Type_Kind_STRUCT &&
type.subtypes_size() != type.field_names_size()) {
msg << "Footer is corrupt: STRUCT type " << i << " has " << type.subtypes_size()
<< " subTypes, but has " << type.field_names_size() << " fieldNames";
throw ParseError(msg.str());
}
for (int j = 0; j < type.subtypes_size(); ++j) {
int subTypeId = static_cast<int>(type.subtypes(j));
if (subTypeId <= i) {
msg << "Footer is corrupt: malformed link from type " << i << " to " << subTypeId;
throw ParseError(msg.str());
}
if (subTypeId >= maxId) {
msg << "Footer is corrupt: types(" << subTypeId << ") not exists";
throw ParseError(msg.str());
}
if (j > 0 && static_cast<int>(type.subtypes(j - 1)) >= subTypeId) {
msg << "Footer is corrupt: subType(" << (j - 1) << ") >= subType(" << j << ") in types("
<< i << "). (" << type.subtypes(j - 1) << " >= " << subTypeId << ")";
throw ParseError(msg.str());
}
}
}
}
/**
* Parse the footer from the given buffer.
* @param stream the file's stream
* @param buffer the buffer to parse the footer from
* @param footerOffset the offset within the buffer that contains the footer
* @param ps the file's postscript
* @param memoryPool the memory pool to use
*/
std::unique_ptr<proto::Footer> readFooter(InputStream* stream, const DataBuffer<char>* buffer,
uint64_t footerOffset, const proto::PostScript& ps,
MemoryPool& memoryPool, ReaderMetrics* readerMetrics) {
const char* footerPtr = buffer->data() + footerOffset;
std::unique_ptr<SeekableInputStream> pbStream = createDecompressor(
convertCompressionKind(ps),
std::make_unique<SeekableArrayInputStream>(footerPtr, ps.footer_length()),
getCompressionBlockSize(ps), memoryPool, readerMetrics);
auto footer = std::make_unique<proto::Footer>();
if (!footer->ParseFromZeroCopyStream(pbStream.get())) {
throw ParseError("Failed to parse the footer from " + stream->getName());
}
checkProtoTypes(*footer);
return footer;
}
std::unique_ptr<Reader> createReader(std::unique_ptr<InputStream> stream,
const ReaderOptions& options) {
auto contents = std::make_shared<FileContents>();
contents->pool = options.getMemoryPool();
contents->errorStream = options.getErrorStream();
contents->readerMetrics = options.getReaderMetrics();
std::string serializedFooter = options.getSerializedFileTail();
uint64_t fileLength;
uint64_t postscriptLength;
if (serializedFooter.length() != 0) {
// Parse the file tail from the serialized one.
proto::FileTail tail;
if (!tail.ParseFromString(serializedFooter)) {
throw ParseError("Failed to parse the file tail from string");
}
contents->postscript = std::make_unique<proto::PostScript>(tail.postscript());
contents->footer = std::make_unique<proto::Footer>(tail.footer());
fileLength = tail.file_length();
postscriptLength = tail.postscript_length();
} else {
// figure out the size of the file using the option or filesystem
fileLength = std::min(options.getTailLocation(), static_cast<uint64_t>(stream->getLength()));
// read last bytes into buffer to get PostScript
uint64_t readSize = std::min(fileLength, DIRECTORY_SIZE_GUESS);
if (readSize < 4) {
throw ParseError("File size too small");
}
auto buffer = std::make_unique<DataBuffer<char>>(*contents->pool, readSize);
stream->read(buffer->data(), readSize, fileLength - readSize);
postscriptLength = buffer->data()[readSize - 1] & 0xff;
contents->postscript = readPostscript(stream.get(), buffer.get(), postscriptLength);
uint64_t footerSize = contents->postscript->footer_length();
uint64_t tailSize = 1 + postscriptLength + footerSize;
if (tailSize >= fileLength) {
std::stringstream msg;
msg << "Invalid ORC tailSize=" << tailSize << ", fileLength=" << fileLength;
throw ParseError(msg.str());
}
uint64_t footerOffset;
if (tailSize > readSize) {
buffer->resize(footerSize);
stream->read(buffer->data(), footerSize, fileLength - tailSize);
footerOffset = 0;
} else {
footerOffset = readSize - tailSize;
}
contents->footer = readFooter(stream.get(), buffer.get(), footerOffset, *contents->postscript,
*contents->pool, contents->readerMetrics);
}
contents->isDecimalAsLong = false;
if (contents->postscript->version_size() == 2) {
FileVersion v(contents->postscript->version(0), contents->postscript->version(1));
if (v == FileVersion::UNSTABLE_PRE_2_0()) {
contents->isDecimalAsLong = true;
}
}
contents->stream = std::move(stream);
return std::make_unique<ReaderImpl>(std::move(contents), options, fileLength, postscriptLength);
}
std::map<uint32_t, BloomFilterIndex> ReaderImpl::getBloomFilters(
uint32_t stripeIndex, const std::set<uint32_t>& included) const {
std::map<uint32_t, BloomFilterIndex> ret;
// find stripe info
if (stripeIndex >= static_cast<uint32_t>(footer_->stripes_size())) {
throw std::logic_error("Illegal stripe index: " +
to_string(static_cast<int64_t>(stripeIndex)));
}
const proto::StripeInformation currentStripeInfo =
footer_->stripes(static_cast<int>(stripeIndex));
const proto::StripeFooter currentStripeFooter = getStripeFooter(currentStripeInfo, *contents_);
// iterate stripe footer to get stream of bloom_filter
uint64_t offset = static_cast<uint64_t>(currentStripeInfo.offset());
for (int i = 0; i < currentStripeFooter.streams_size(); i++) {
const proto::Stream& stream = currentStripeFooter.streams(i);
uint32_t column = static_cast<uint32_t>(stream.column());
uint64_t length = static_cast<uint64_t>(stream.length());
// a bloom filter stream from a selected column is found
if (stream.kind() == proto::Stream_Kind_BLOOM_FILTER_UTF8 &&
(included.empty() || included.find(column) != included.end())) {
std::unique_ptr<SeekableInputStream> pbStream =
createDecompressor(contents_->compression,
std::make_unique<SeekableFileInputStream>(
contents_->stream.get(), offset, length, *contents_->pool),
contents_->blockSize, *(contents_->pool), contents_->readerMetrics);
proto::BloomFilterIndex pbBFIndex;
if (!pbBFIndex.ParseFromZeroCopyStream(pbStream.get())) {
throw ParseError("Failed to parse BloomFilterIndex");
}
BloomFilterIndex bfIndex;
for (int j = 0; j < pbBFIndex.bloom_filter_size(); j++) {
std::unique_ptr<BloomFilter> entry = BloomFilterUTF8Utils::deserialize(
stream.kind(), currentStripeFooter.columns(static_cast<int>(stream.column())),
pbBFIndex.bloom_filter(j));
bfIndex.entries.push_back(std::shared_ptr<BloomFilter>(std::move(entry)));
}
// add bloom filters to result for one column
ret[column] = bfIndex;
}
offset += length;
}
return ret;
}
RowReader::~RowReader() {
// PASS
}
Reader::~Reader() {
// PASS
}
InputStream::~InputStream(){
// PASS
};
} // namespace orc