blob: 8df6f42496a810c90b51adc16fbfffa8200cee69 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "Adaptor.hh"
#include "BloomFilter.hh"
#include "Options.hh"
#include "Reader.hh"
#include "Statistics.hh"
#include "StripeStream.hh"
#include "wrap/coded-stream-wrapper.h"
#include <algorithm>
#include <iostream>
#include <memory>
#include <sstream>
#include <string>
#include <vector>
#include <iterator>
#include <set>
namespace orc {
const WriterVersionImpl &WriterVersionImpl::VERSION_HIVE_8732() {
static const WriterVersionImpl version(WriterVersion_HIVE_8732);
return version;
}
uint64_t getCompressionBlockSize(const proto::PostScript& ps) {
if (ps.has_compressionblocksize()) {
return ps.compressionblocksize();
} else {
return 256 * 1024;
}
}
CompressionKind convertCompressionKind(const proto::PostScript& ps) {
if (ps.has_compression()) {
return static_cast<CompressionKind>(ps.compression());
} else {
throw ParseError("Unknown compression type");
}
}
std::string ColumnSelector::toDotColumnPath() {
if (columns.empty()) {
return std::string();
}
std::ostringstream columnStream;
std::copy(columns.begin(), columns.end(),
std::ostream_iterator<std::string>(columnStream, "."));
std::string columnPath = columnStream.str();
return columnPath.substr(0, columnPath.length() - 1);
}
void ColumnSelector::selectChildren(std::vector<bool>& selectedColumns, const Type& type) {
size_t id = static_cast<size_t>(type.getColumnId());
if (!selectedColumns[id]) {
selectedColumns[id] = true;
for(size_t c = id; c <= type.getMaximumColumnId(); ++c){
selectedColumns[c] = true;
}
}
}
/**
* Recurses over a type tree and selects the parents of every selected type.
* @return true if any child was selected.
*/
bool ColumnSelector::selectParents(std::vector<bool>& selectedColumns, const Type& type) {
size_t id = static_cast<size_t>(type.getColumnId());
bool result = selectedColumns[id];
for(uint64_t c=0; c < type.getSubtypeCount(); ++c) {
result |= selectParents(selectedColumns, *type.getSubtype(c));
}
selectedColumns[id] = result;
return result;
}
/**
* Recurses over a type tree and build two maps
* map<TypeName, TypeId>, map<TypeId, Type>
*/
void ColumnSelector::buildTypeNameIdMap(const Type* type) {
// map<type_id, Type*>
idTypeMap[type->getColumnId()] = type;
if (STRUCT == type->getKind()) {
for (size_t i = 0; i < type->getSubtypeCount(); ++i) {
const std::string& fieldName = type->getFieldName(i);
columns.push_back(fieldName);
nameIdMap[toDotColumnPath()] = type->getSubtype(i)->getColumnId();
buildTypeNameIdMap(type->getSubtype(i));
columns.pop_back();
}
} else {
// other non-primitive type
for (size_t j = 0; j < type->getSubtypeCount(); ++j) {
buildTypeNameIdMap(type->getSubtype(j));
}
}
}
void ColumnSelector::updateSelected(std::vector<bool>& selectedColumns,
const RowReaderOptions& options) {
selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), false);
if (contents->schema->getKind() == STRUCT && options.getIndexesSet()) {
for(std::list<uint64_t>::const_iterator field = options.getInclude().begin();
field != options.getInclude().end(); ++field) {
updateSelectedByFieldId(selectedColumns, *field);
}
} else if (contents->schema->getKind() == STRUCT && options.getNamesSet()) {
for(std::list<std::string>::const_iterator field = options.getIncludeNames().begin();
field != options.getIncludeNames().end(); ++field) {
updateSelectedByName(selectedColumns, *field);
}
} else if (options.getTypeIdsSet()) {
for(std::list<uint64_t>::const_iterator typeId = options.getInclude().begin();
typeId != options.getInclude().end(); ++typeId) {
updateSelectedByTypeId(selectedColumns, *typeId);
}
} else {
// default is to select all columns
std::fill(selectedColumns.begin(), selectedColumns.end(), true);
}
selectParents(selectedColumns, *contents->schema.get());
selectedColumns[0] = true; // column 0 is selected by default
}
void ColumnSelector::updateSelectedByFieldId(std::vector<bool>& selectedColumns,
uint64_t fieldId) {
if (fieldId < contents->schema->getSubtypeCount()) {
selectChildren(selectedColumns, *contents->schema->getSubtype(fieldId));
} else {
std::stringstream buffer;
buffer << "Invalid column selected " << fieldId << " out of "
<< contents->schema->getSubtypeCount();
throw ParseError(buffer.str());
}
}
void ColumnSelector::updateSelectedByTypeId(std::vector<bool>& selectedColumns, uint64_t typeId) {
if (typeId < selectedColumns.size()) {
const Type& type = *idTypeMap[typeId];
selectChildren(selectedColumns, type);
} else {
std::stringstream buffer;
buffer << "Invalid type id selected " << typeId << " out of "
<< selectedColumns.size();
throw ParseError(buffer.str());
}
}
void ColumnSelector::updateSelectedByName(std::vector<bool>& selectedColumns,
const std::string& fieldName) {
std::map<std::string, uint64_t>::const_iterator ite = nameIdMap.find(fieldName);
if (ite != nameIdMap.end()) {
updateSelectedByTypeId(selectedColumns, ite->second);
} else {
throw ParseError("Invalid column selected " + fieldName);
}
}
ColumnSelector::ColumnSelector(const FileContents* _contents): contents(_contents) {
buildTypeNameIdMap(contents->schema.get());
}
RowReaderImpl::RowReaderImpl(std::shared_ptr<FileContents> _contents,
const RowReaderOptions& opts
): localTimezone(getLocalTimezone()),
contents(_contents),
throwOnHive11DecimalOverflow(opts.getThrowOnHive11DecimalOverflow()),
forcedScaleOnHive11Decimal(opts.getForcedScaleOnHive11Decimal()),
footer(contents->footer.get()),
firstRowOfStripe(*contents->pool, 0),
enableEncodedBlock(opts.getEnableLazyDecoding()) {
uint64_t numberOfStripes;
numberOfStripes = static_cast<uint64_t>(footer->stripes_size());
currentStripe = numberOfStripes;
lastStripe = 0;
currentRowInStripe = 0;
rowsInCurrentStripe = 0;
uint64_t rowTotal = 0;
firstRowOfStripe.resize(numberOfStripes);
for(size_t i=0; i < numberOfStripes; ++i) {
firstRowOfStripe[i] = rowTotal;
proto::StripeInformation stripeInfo =
footer->stripes(static_cast<int>(i));
rowTotal += stripeInfo.numberofrows();
bool isStripeInRange = stripeInfo.offset() >= opts.getOffset() &&
stripeInfo.offset() < opts.getOffset() + opts.getLength();
if (isStripeInRange) {
if (i < currentStripe) {
currentStripe = i;
}
if (i >= lastStripe) {
lastStripe = i + 1;
}
}
}
firstStripe = currentStripe;
if (currentStripe == 0) {
previousRow = (std::numeric_limits<uint64_t>::max)();
} else if (currentStripe == numberOfStripes) {
previousRow = footer->numberofrows();
} else {
previousRow = firstRowOfStripe[firstStripe]-1;
}
ColumnSelector column_selector(contents.get());
column_selector.updateSelected(selectedColumns, opts);
}
CompressionKind RowReaderImpl::getCompression() const {
return contents->compression;
}
uint64_t RowReaderImpl::getCompressionSize() const {
return contents->blockSize;
}
const std::vector<bool> RowReaderImpl::getSelectedColumns() const {
return selectedColumns;
}
const Type& RowReaderImpl::getSelectedType() const {
if (selectedSchema.get() == nullptr) {
selectedSchema = buildSelectedType(contents->schema.get(),
selectedColumns);
}
return *(selectedSchema.get());
}
uint64_t RowReaderImpl::getRowNumber() const {
return previousRow;
}
void RowReaderImpl::seekToRow(uint64_t rowNumber) {
// Empty file
if (lastStripe == 0) {
return;
}
// If we are reading only a portion of the file
// (bounded by firstStripe and lastStripe),
// seeking before or after the portion of interest should return no data.
// Implement this by setting previousRow to the number of rows in the file.
// seeking past lastStripe
uint64_t num_stripes = static_cast<uint64_t>(footer->stripes_size());
if ( (lastStripe == num_stripes
&& rowNumber >= footer->numberofrows()) ||
(lastStripe < num_stripes
&& rowNumber >= firstRowOfStripe[lastStripe]) ) {
currentStripe = num_stripes;
previousRow = footer->numberofrows();
return;
}
uint64_t seekToStripe = 0;
while (seekToStripe+1 < lastStripe &&
firstRowOfStripe[seekToStripe+1] <= rowNumber) {
seekToStripe++;
}
// seeking before the first stripe
if (seekToStripe < firstStripe) {
currentStripe = num_stripes;
previousRow = footer->numberofrows();
return;
}
currentStripe = seekToStripe;
currentRowInStripe = rowNumber - firstRowOfStripe[currentStripe];
previousRow = rowNumber;
startNextStripe();
uint64_t rowsToSkip = currentRowInStripe;
if (footer->rowindexstride() > 0 &&
currentStripeInfo.indexlength() > 0) {
uint32_t rowGroupId =
static_cast<uint32_t>(currentRowInStripe / footer->rowindexstride());
rowsToSkip -= static_cast<uint64_t>(rowGroupId) * footer->rowindexstride();
if (rowGroupId != 0) {
seekToRowGroup(rowGroupId);
}
}
reader->skip(rowsToSkip);
}
void RowReaderImpl::seekToRowGroup(uint32_t rowGroupEntryId) {
// reset all previous row indexes
rowIndexes.clear();
// obtain row indexes for selected columns
uint64_t offset = currentStripeInfo.offset();
for (int i = 0; i < currentStripeFooter.streams_size(); ++i) {
const proto::Stream& pbStream = currentStripeFooter.streams(i);
uint64_t colId = pbStream.column();
if (selectedColumns[colId] && pbStream.has_kind()
&& pbStream.kind() == proto::Stream_Kind_ROW_INDEX) {
std::unique_ptr<SeekableInputStream> inStream =
createDecompressor(getCompression(),
std::unique_ptr<SeekableInputStream>
(new SeekableFileInputStream
(contents->stream.get(),
offset,
pbStream.length(),
*contents->pool)),
getCompressionSize(),
*contents->pool);
proto::RowIndex rowIndex;
if (!rowIndex.ParseFromZeroCopyStream(inStream.get())) {
throw ParseError("Failed to parse the row index");
}
rowIndexes[colId] = rowIndex;
}
offset += pbStream.length();
}
// store positions for selected columns
std::vector<std::list<uint64_t>> positions;
// store position providers for selected colimns
std::unordered_map<uint64_t, PositionProvider> positionProviders;
for (auto rowIndex = rowIndexes.cbegin();
rowIndex != rowIndexes.cend(); ++rowIndex) {
uint64_t colId = rowIndex->first;
const proto::RowIndexEntry& entry =
rowIndex->second.entry(static_cast<int32_t>(rowGroupEntryId));
// copy index positions for a specific column
positions.push_back({});
auto& position = positions.back();
for (int pos = 0; pos != entry.positions_size(); ++pos) {
position.push_back(entry.positions(pos));
}
positionProviders.insert(std::make_pair(colId, PositionProvider(position)));
}
reader->seekToRowGroup(positionProviders);
}
const FileContents& RowReaderImpl::getFileContents() const {
return *contents;
}
bool RowReaderImpl::getThrowOnHive11DecimalOverflow() const {
return throwOnHive11DecimalOverflow;
}
int32_t RowReaderImpl::getForcedScaleOnHive11Decimal() const {
return forcedScaleOnHive11Decimal;
}
proto::StripeFooter getStripeFooter(const proto::StripeInformation& info,
const FileContents& contents) {
uint64_t stripeFooterStart = info.offset() + info.indexlength() +
info.datalength();
uint64_t stripeFooterLength = info.footerlength();
std::unique_ptr<SeekableInputStream> pbStream =
createDecompressor(contents.compression,
std::unique_ptr<SeekableInputStream>
(new SeekableFileInputStream(contents.stream.get(),
stripeFooterStart,
stripeFooterLength,
*contents.pool)),
contents.blockSize,
*contents.pool);
proto::StripeFooter result;
if (!result.ParseFromZeroCopyStream(pbStream.get())) {
throw ParseError(std::string("bad StripeFooter from ") +
pbStream->getName());
}
// Verify StripeFooter in case it's corrupt
if (result.columns_size() != contents.footer->types_size()) {
std::stringstream msg;
msg << "bad number of ColumnEncodings in StripeFooter: expected="
<< contents.footer->types_size() << ", actual=" << result.columns_size();
throw ParseError(msg.str());
}
return result;
}
ReaderImpl::ReaderImpl(std::shared_ptr<FileContents> _contents,
const ReaderOptions& opts,
uint64_t _fileLength,
uint64_t _postscriptLength
): contents(std::move(_contents)),
options(opts),
fileLength(_fileLength),
postscriptLength(_postscriptLength),
footer(contents->footer.get()) {
isMetadataLoaded = false;
checkOrcVersion();
numberOfStripes = static_cast<uint64_t>(footer->stripes_size());
contents->schema = REDUNDANT_MOVE(convertType(footer->types(0), *footer));
contents->blockSize = getCompressionBlockSize(*contents->postscript);
contents->compression= convertCompressionKind(*contents->postscript);
}
std::string ReaderImpl::getSerializedFileTail() const {
proto::FileTail tail;
proto::PostScript *mutable_ps = tail.mutable_postscript();
mutable_ps->CopyFrom(*contents->postscript);
proto::Footer *mutableFooter = tail.mutable_footer();
mutableFooter->CopyFrom(*footer);
tail.set_filelength(fileLength);
tail.set_postscriptlength(postscriptLength);
std::string result;
if (!tail.SerializeToString(&result)) {
throw ParseError("Failed to serialize file tail");
}
return result;
}
const ReaderOptions& ReaderImpl::getReaderOptions() const {
return options;
}
CompressionKind ReaderImpl::getCompression() const {
return contents->compression;
}
uint64_t ReaderImpl::getCompressionSize() const {
return contents->blockSize;
}
uint64_t ReaderImpl::getNumberOfStripes() const {
return numberOfStripes;
}
uint64_t ReaderImpl::getNumberOfStripeStatistics() const {
if (!isMetadataLoaded) {
readMetadata();
}
return metadata.get() == nullptr ? 0 :
static_cast<uint64_t>(metadata->stripestats_size());
}
std::unique_ptr<StripeInformation>
ReaderImpl::getStripe(uint64_t stripeIndex) const {
if (stripeIndex > getNumberOfStripes()) {
throw std::logic_error("stripe index out of range");
}
proto::StripeInformation stripeInfo =
footer->stripes(static_cast<int>(stripeIndex));
return std::unique_ptr<StripeInformation>
(new StripeInformationImpl
(stripeInfo.offset(),
stripeInfo.indexlength(),
stripeInfo.datalength(),
stripeInfo.footerlength(),
stripeInfo.numberofrows(),
contents->stream.get(),
*contents->pool,
contents->compression,
contents->blockSize));
}
FileVersion ReaderImpl::getFormatVersion() const {
if (contents->postscript->version_size() != 2) {
return FileVersion::v_0_11();
}
return FileVersion(
contents->postscript->version(0),
contents->postscript->version(1));
}
uint64_t ReaderImpl::getNumberOfRows() const {
return footer->numberofrows();
}
WriterId ReaderImpl::getWriterId() const {
if (footer->has_writer()) {
uint32_t id = footer->writer();
if (id > WriterId::PRESTO_WRITER) {
return WriterId::UNKNOWN_WRITER;
} else {
return static_cast<WriterId>(id);
}
}
return WriterId::ORC_JAVA_WRITER;
}
uint32_t ReaderImpl::getWriterIdValue() const {
if (footer->has_writer()) {
return footer->writer();
} else {
return WriterId::ORC_JAVA_WRITER;
}
}
WriterVersion ReaderImpl::getWriterVersion() const {
if (!contents->postscript->has_writerversion()) {
return WriterVersion_ORIGINAL;
}
return static_cast<WriterVersion>(contents->postscript->writerversion());
}
uint64_t ReaderImpl::getContentLength() const {
return footer->contentlength();
}
uint64_t ReaderImpl::getStripeStatisticsLength() const {
return contents->postscript->metadatalength();
}
uint64_t ReaderImpl::getFileFooterLength() const {
return contents->postscript->footerlength();
}
uint64_t ReaderImpl::getFilePostscriptLength() const {
return postscriptLength;
}
uint64_t ReaderImpl::getFileLength() const {
return fileLength;
}
uint64_t ReaderImpl::getRowIndexStride() const {
return footer->rowindexstride();
}
const std::string& ReaderImpl::getStreamName() const {
return contents->stream->getName();
}
std::list<std::string> ReaderImpl::getMetadataKeys() const {
std::list<std::string> result;
for(int i=0; i < footer->metadata_size(); ++i) {
result.push_back(footer->metadata(i).name());
}
return result;
}
std::string ReaderImpl::getMetadataValue(const std::string& key) const {
for(int i=0; i < footer->metadata_size(); ++i) {
if (footer->metadata(i).name() == key) {
return footer->metadata(i).value();
}
}
throw std::range_error("key not found");
}
void ReaderImpl::getRowIndexStatistics(const proto::StripeInformation& stripeInfo,
uint64_t stripeIndex, const proto::StripeFooter& currentStripeFooter,
std::vector<std::vector<proto::ColumnStatistics> >* indexStats) const {
int num_streams = currentStripeFooter.streams_size();
uint64_t offset = stripeInfo.offset();
uint64_t indexEnd = stripeInfo.offset() + stripeInfo.indexlength();
for (int i = 0; i < num_streams; i++) {
const proto::Stream& stream = currentStripeFooter.streams(i);
StreamKind streamKind = static_cast<StreamKind>(stream.kind());
uint64_t length = static_cast<uint64_t>(stream.length());
if (streamKind == StreamKind::StreamKind_ROW_INDEX) {
if (offset + length > indexEnd) {
std::stringstream msg;
msg << "Malformed RowIndex stream meta in stripe " << stripeIndex
<< ": streamOffset=" << offset << ", streamLength=" << length
<< ", stripeOffset=" << stripeInfo.offset() << ", stripeIndexLength="
<< stripeInfo.indexlength();
throw ParseError(msg.str());
}
std::unique_ptr<SeekableInputStream> pbStream =
createDecompressor(contents->compression,
std::unique_ptr<SeekableInputStream>
(new SeekableFileInputStream(contents->stream.get(),
offset,
length,
*contents->pool)),
contents->blockSize,
*(contents->pool));
proto::RowIndex rowIndex;
if (!rowIndex.ParseFromZeroCopyStream(pbStream.get())) {
throw ParseError("Failed to parse RowIndex from stripe footer");
}
int num_entries = rowIndex.entry_size();
size_t column = static_cast<size_t>(stream.column());
for (int j = 0; j < num_entries; j++) {
const proto::RowIndexEntry& entry = rowIndex.entry(j);
(*indexStats)[column].push_back(entry.statistics());
}
}
offset += length;
}
}
bool ReaderImpl::hasMetadataValue(const std::string& key) const {
for(int i=0; i < footer->metadata_size(); ++i) {
if (footer->metadata(i).name() == key) {
return true;
}
}
return false;
}
const Type& ReaderImpl::getType() const {
return *(contents->schema.get());
}
std::unique_ptr<StripeStatistics>
ReaderImpl::getStripeStatistics(uint64_t stripeIndex) const {
if (!isMetadataLoaded) {
readMetadata();
}
if (metadata.get() == nullptr) {
throw std::logic_error("No stripe statistics in file");
}
size_t num_cols = static_cast<size_t>(
metadata->stripestats(
static_cast<int>(stripeIndex)).colstats_size());
std::vector<std::vector<proto::ColumnStatistics> > indexStats(num_cols);
proto::StripeInformation currentStripeInfo =
footer->stripes(static_cast<int>(stripeIndex));
proto::StripeFooter currentStripeFooter =
getStripeFooter(currentStripeInfo, *contents.get());
getRowIndexStatistics(currentStripeInfo, stripeIndex, currentStripeFooter, &indexStats);
const Timezone& writerTZ =
currentStripeFooter.has_writertimezone() ?
getTimezoneByName(currentStripeFooter.writertimezone()) :
getLocalTimezone();
StatContext statContext(hasCorrectStatistics(), &writerTZ);
return std::unique_ptr<StripeStatistics>
(new StripeStatisticsImpl(metadata->stripestats(static_cast<int>(stripeIndex)),
indexStats, statContext));
}
std::unique_ptr<Statistics> ReaderImpl::getStatistics() const {
StatContext statContext(hasCorrectStatistics());
return std::unique_ptr<Statistics>
(new StatisticsImpl(*footer, statContext));
}
std::unique_ptr<ColumnStatistics>
ReaderImpl::getColumnStatistics(uint32_t index) const {
if (index >= static_cast<uint64_t>(footer->statistics_size())) {
throw std::logic_error("column index out of range");
}
proto::ColumnStatistics col =
footer->statistics(static_cast<int32_t>(index));
StatContext statContext(hasCorrectStatistics());
return std::unique_ptr<ColumnStatistics> (convertColumnStatistics(col, statContext));
}
void ReaderImpl::readMetadata() const {
uint64_t metadataSize = contents->postscript->metadatalength();
uint64_t footerLength = contents->postscript->footerlength();
if (fileLength < metadataSize + footerLength + postscriptLength + 1) {
std::stringstream msg;
msg << "Invalid Metadata length: fileLength=" << fileLength
<< ", metadataLength=" << metadataSize << ", footerLength=" << footerLength
<< ", postscriptLength=" << postscriptLength;
throw ParseError(msg.str());
}
uint64_t metadataStart = fileLength - metadataSize - footerLength - postscriptLength - 1;
if (metadataSize != 0) {
std::unique_ptr<SeekableInputStream> pbStream =
createDecompressor(contents->compression,
std::unique_ptr<SeekableInputStream>
(new SeekableFileInputStream(contents->stream.get(),
metadataStart,
metadataSize,
*contents->pool)),
contents->blockSize,
*contents->pool);
metadata.reset(new proto::Metadata());
if (!metadata->ParseFromZeroCopyStream(pbStream.get())) {
throw ParseError("Failed to parse the metadata");
}
}
isMetadataLoaded = true;
}
bool ReaderImpl::hasCorrectStatistics() const {
return !WriterVersionImpl::VERSION_HIVE_8732().compareGT(getWriterVersion());
}
void ReaderImpl::checkOrcVersion() {
FileVersion version = getFormatVersion();
if (version != FileVersion(0, 11) && version != FileVersion(0, 12)) {
*(options.getErrorStream())
<< "Warning: ORC file " << contents->stream->getName()
<< " was written in an unknown format version "
<< version.toString() << "\n";
}
}
std::unique_ptr<RowReader> ReaderImpl::createRowReader() const {
RowReaderOptions defaultOpts;
return createRowReader(defaultOpts);
}
std::unique_ptr<RowReader> ReaderImpl::createRowReader(
const RowReaderOptions& opts) const {
return std::unique_ptr<RowReader>(new RowReaderImpl(contents, opts));
}
uint64_t maxStreamsForType(const proto::Type& type) {
switch (static_cast<int64_t>(type.kind())) {
case proto::Type_Kind_STRUCT:
return 1;
case proto::Type_Kind_INT:
case proto::Type_Kind_LONG:
case proto::Type_Kind_SHORT:
case proto::Type_Kind_FLOAT:
case proto::Type_Kind_DOUBLE:
case proto::Type_Kind_BOOLEAN:
case proto::Type_Kind_BYTE:
case proto::Type_Kind_DATE:
case proto::Type_Kind_LIST:
case proto::Type_Kind_MAP:
case proto::Type_Kind_UNION:
return 2;
case proto::Type_Kind_BINARY:
case proto::Type_Kind_DECIMAL:
case proto::Type_Kind_TIMESTAMP:
return 3;
case proto::Type_Kind_CHAR:
case proto::Type_Kind_STRING:
case proto::Type_Kind_VARCHAR:
return 4;
default:
return 0;
}
}
uint64_t ReaderImpl::getMemoryUse(int stripeIx) {
std::vector<bool> selectedColumns;
selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), true);
return getMemoryUse(stripeIx, selectedColumns);
}
uint64_t ReaderImpl::getMemoryUseByFieldId(const std::list<uint64_t>& include, int stripeIx) {
std::vector<bool> selectedColumns;
selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), false);
ColumnSelector column_selector(contents.get());
if (contents->schema->getKind() == STRUCT && include.begin() != include.end()) {
for(std::list<uint64_t>::const_iterator field = include.begin();
field != include.end(); ++field) {
column_selector.updateSelectedByFieldId(selectedColumns, *field);
}
} else {
// default is to select all columns
std::fill(selectedColumns.begin(), selectedColumns.end(), true);
}
column_selector.selectParents(selectedColumns, *contents->schema.get());
selectedColumns[0] = true; // column 0 is selected by default
return getMemoryUse(stripeIx, selectedColumns);
}
uint64_t ReaderImpl::getMemoryUseByName(const std::list<std::string>& names, int stripeIx) {
std::vector<bool> selectedColumns;
selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), false);
ColumnSelector column_selector(contents.get());
if (contents->schema->getKind() == STRUCT && names.begin() != names.end()) {
for(std::list<std::string>::const_iterator field = names.begin();
field != names.end(); ++field) {
column_selector.updateSelectedByName(selectedColumns, *field);
}
} else {
// default is to select all columns
std::fill(selectedColumns.begin(), selectedColumns.end(), true);
}
column_selector.selectParents(selectedColumns, *contents->schema.get());
selectedColumns[0] = true; // column 0 is selected by default
return getMemoryUse(stripeIx, selectedColumns);
}
uint64_t ReaderImpl::getMemoryUseByTypeId(const std::list<uint64_t>& include, int stripeIx) {
std::vector<bool> selectedColumns;
selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), false);
ColumnSelector column_selector(contents.get());
if (include.begin() != include.end()) {
for(std::list<uint64_t>::const_iterator field = include.begin();
field != include.end(); ++field) {
column_selector.updateSelectedByTypeId(selectedColumns, *field);
}
} else {
// default is to select all columns
std::fill(selectedColumns.begin(), selectedColumns.end(), true);
}
column_selector.selectParents(selectedColumns, *contents->schema.get());
selectedColumns[0] = true; // column 0 is selected by default
return getMemoryUse(stripeIx, selectedColumns);
}
uint64_t ReaderImpl::getMemoryUse(int stripeIx, std::vector<bool>& selectedColumns) {
uint64_t maxDataLength = 0;
if (stripeIx >= 0 && stripeIx < footer->stripes_size()) {
uint64_t stripe = footer->stripes(stripeIx).datalength();
if (maxDataLength < stripe) {
maxDataLength = stripe;
}
} else {
for (int i=0; i < footer->stripes_size(); i++) {
uint64_t stripe = footer->stripes(i).datalength();
if (maxDataLength < stripe) {
maxDataLength = stripe;
}
}
}
bool hasStringColumn = false;
uint64_t nSelectedStreams = 0;
for (int i=0; !hasStringColumn && i < footer->types_size(); i++) {
if (selectedColumns[static_cast<size_t>(i)]) {
const proto::Type& type = footer->types(i);
nSelectedStreams += maxStreamsForType(type) ;
switch (static_cast<int64_t>(type.kind())) {
case proto::Type_Kind_CHAR:
case proto::Type_Kind_STRING:
case proto::Type_Kind_VARCHAR:
case proto::Type_Kind_BINARY: {
hasStringColumn = true;
break;
}
default: {
break;
}
}
}
}
/* If a string column is read, use stripe datalength as a memory estimate
* because we don't know the dictionary size. Multiply by 2 because
* a string column requires two buffers:
* in the input stream and in the seekable input stream.
* If no string column is read, estimate from the number of streams.
*/
uint64_t memory = hasStringColumn ? 2 * maxDataLength :
std::min(uint64_t(maxDataLength),
nSelectedStreams * contents->stream->getNaturalReadSize());
// Do we need even more memory to read the footer or the metadata?
if (memory < contents->postscript->footerlength() + DIRECTORY_SIZE_GUESS) {
memory = contents->postscript->footerlength() + DIRECTORY_SIZE_GUESS;
}
if (memory < contents->postscript->metadatalength()) {
memory = contents->postscript->metadatalength();
}
// Account for firstRowOfStripe.
memory += static_cast<uint64_t>(footer->stripes_size()) * sizeof(uint64_t);
// Decompressors need buffers for each stream
uint64_t decompressorMemory = 0;
if (contents->compression != CompressionKind_NONE) {
for (int i=0; i < footer->types_size(); i++) {
if (selectedColumns[static_cast<size_t>(i)]) {
const proto::Type& type = footer->types(i);
decompressorMemory += maxStreamsForType(type) * contents->blockSize;
}
}
if (contents->compression == CompressionKind_SNAPPY) {
decompressorMemory *= 2; // Snappy decompressor uses a second buffer
}
}
return memory + decompressorMemory ;
}
void RowReaderImpl::startNextStripe() {
reader.reset(); // ColumnReaders use lots of memory; free old memory first
currentStripeInfo = footer->stripes(static_cast<int>(currentStripe));
uint64_t fileLength = contents->stream->getLength();
if (currentStripeInfo.offset() + currentStripeInfo.indexlength() +
currentStripeInfo.datalength() + currentStripeInfo.footerlength() >= fileLength) {
std::stringstream msg;
msg << "Malformed StripeInformation at stripe index " << currentStripe << ": fileLength="
<< fileLength << ", StripeInfo=(offset=" << currentStripeInfo.offset() << ", indexLength="
<< currentStripeInfo.indexlength() << ", dataLength=" << currentStripeInfo.datalength()
<< ", footerLength=" << currentStripeInfo.footerlength() << ")";
throw ParseError(msg.str());
}
currentStripeFooter = getStripeFooter(currentStripeInfo, *contents.get());
rowsInCurrentStripe = currentStripeInfo.numberofrows();
const Timezone& writerTimezone =
currentStripeFooter.has_writertimezone() ?
getTimezoneByName(currentStripeFooter.writertimezone()) :
localTimezone;
StripeStreamsImpl stripeStreams(*this, currentStripe, currentStripeInfo,
currentStripeFooter,
currentStripeInfo.offset(),
*(contents->stream.get()),
writerTimezone);
reader = buildReader(*contents->schema.get(), stripeStreams);
}
bool RowReaderImpl::next(ColumnVectorBatch& data) {
if (currentStripe >= lastStripe) {
data.numElements = 0;
if (lastStripe > 0) {
previousRow = firstRowOfStripe[lastStripe - 1] +
footer->stripes(static_cast<int>(lastStripe - 1)).numberofrows();
} else {
previousRow = 0;
}
return false;
}
if (currentRowInStripe == 0) {
startNextStripe();
}
uint64_t rowsToRead =
std::min(static_cast<uint64_t>(data.capacity),
rowsInCurrentStripe - currentRowInStripe);
data.numElements = rowsToRead;
if (enableEncodedBlock) {
reader->nextEncoded(data, rowsToRead, nullptr);
}
else {
reader->next(data, rowsToRead, nullptr);
}
// update row number
previousRow = firstRowOfStripe[currentStripe] + currentRowInStripe;
currentRowInStripe += rowsToRead;
if (currentRowInStripe >= rowsInCurrentStripe) {
currentStripe += 1;
currentRowInStripe = 0;
}
return rowsToRead != 0;
}
std::unique_ptr<ColumnVectorBatch> RowReaderImpl::createRowBatch
(uint64_t capacity) const {
return getSelectedType().createRowBatch(capacity, *contents->pool, enableEncodedBlock);
}
void ensureOrcFooter(InputStream* stream,
DataBuffer<char> *buffer,
uint64_t postscriptLength) {
const std::string MAGIC("ORC");
const uint64_t magicLength = MAGIC.length();
const char * const bufferStart = buffer->data();
const uint64_t bufferLength = buffer->size();
if (postscriptLength < magicLength || bufferLength < magicLength) {
throw ParseError("Invalid ORC postscript length");
}
const char* magicStart = bufferStart + bufferLength - 1 - magicLength;
// Look for the magic string at the end of the postscript.
if (memcmp(magicStart, MAGIC.c_str(), magicLength) != 0) {
// If there is no magic string at the end, check the beginning.
// Only files written by Hive 0.11.0 don't have the tail ORC string.
std::unique_ptr<char[]> frontBuffer( new char[magicLength] );
stream->read(frontBuffer.get(), magicLength, 0);
bool foundMatch = memcmp(frontBuffer.get(), MAGIC.c_str(), magicLength) == 0;
if (!foundMatch) {
throw ParseError("Not an ORC file");
}
}
}
/**
* Read the file's postscript from the given buffer.
* @param stream the file stream
* @param buffer the buffer with the tail of the file.
* @param postscriptSize the length of postscript in bytes
*/
std::unique_ptr<proto::PostScript> readPostscript(InputStream *stream,
DataBuffer<char> *buffer,
uint64_t postscriptSize) {
char *ptr = buffer->data();
uint64_t readSize = buffer->size();
ensureOrcFooter(stream, buffer, postscriptSize);
std::unique_ptr<proto::PostScript> postscript =
std::unique_ptr<proto::PostScript>(new proto::PostScript());
if (readSize < 1 + postscriptSize) {
std::stringstream msg;
msg << "Invalid ORC postscript length: " << postscriptSize << ", file length = "
<< stream->getLength();
throw ParseError(msg.str());
}
if (!postscript->ParseFromArray(ptr + readSize - 1 - postscriptSize,
static_cast<int>(postscriptSize))) {
throw ParseError("Failed to parse the postscript from " +
stream->getName());
}
return REDUNDANT_MOVE(postscript);
}
/**
* Check that proto Types are valid. Indices in the type tree should be valid,
* so we won't crash when we convert the proto::Types to TypeImpls (ORC-317).
* For STRUCT types, fieldName size should match subTypes size (ORC-581).
*/
void checkProtoTypes(const proto::Footer &footer) {
std::stringstream msg;
int maxId = footer.types_size();
if (maxId <= 0) {
throw ParseError("Footer is corrupt: no types found");
}
for (int i = 0; i < maxId; ++i) {
const proto::Type& type = footer.types(i);
if (type.kind() == proto::Type_Kind_STRUCT
&& type.subtypes_size() != type.fieldnames_size()) {
msg << "Footer is corrupt: STRUCT type " << i << " has " << type.subtypes_size()
<< " subTypes, but has " << type.fieldnames_size() << " fieldNames";
throw ParseError(msg.str());
}
for (int j = 0; j < type.subtypes_size(); ++j) {
int subTypeId = static_cast<int>(type.subtypes(j));
if (subTypeId <= i) {
msg << "Footer is corrupt: malformed link from type " << i << " to "
<< subTypeId;
throw ParseError(msg.str());
}
if (subTypeId >= maxId) {
msg << "Footer is corrupt: types(" << subTypeId << ") not exists";
throw ParseError(msg.str());
}
if (j > 0 && static_cast<int>(type.subtypes(j - 1)) >= subTypeId) {
msg << "Footer is corrupt: subType(" << (j-1) << ") >= subType(" << j
<< ") in types(" << i << "). (" << type.subtypes(j - 1) << " >= "
<< subTypeId << ")";
throw ParseError(msg.str());
}
}
}
}
/**
* Parse the footer from the given buffer.
* @param stream the file's stream
* @param buffer the buffer to parse the footer from
* @param footerOffset the offset within the buffer that contains the footer
* @param ps the file's postscript
* @param memoryPool the memory pool to use
*/
std::unique_ptr<proto::Footer> readFooter(InputStream* stream,
const DataBuffer<char> *buffer,
uint64_t footerOffset,
const proto::PostScript& ps,
MemoryPool& memoryPool) {
const char *footerPtr = buffer->data() + footerOffset;
std::unique_ptr<SeekableInputStream> pbStream =
createDecompressor(convertCompressionKind(ps),
std::unique_ptr<SeekableInputStream>
(new SeekableArrayInputStream(footerPtr,
ps.footerlength())),
getCompressionBlockSize(ps),
memoryPool);
std::unique_ptr<proto::Footer> footer =
std::unique_ptr<proto::Footer>(new proto::Footer());
if (!footer->ParseFromZeroCopyStream(pbStream.get())) {
throw ParseError("Failed to parse the footer from " +
stream->getName());
}
checkProtoTypes(*footer);
return REDUNDANT_MOVE(footer);
}
std::unique_ptr<Reader> createReader(std::unique_ptr<InputStream> stream,
const ReaderOptions& options) {
std::shared_ptr<FileContents> contents = std::shared_ptr<FileContents>(new FileContents());
contents->pool = options.getMemoryPool();
contents->errorStream = options.getErrorStream();
std::string serializedFooter = options.getSerializedFileTail();
uint64_t fileLength;
uint64_t postscriptLength;
if (serializedFooter.length() != 0) {
// Parse the file tail from the serialized one.
proto::FileTail tail;
if (!tail.ParseFromString(serializedFooter)) {
throw ParseError("Failed to parse the file tail from string");
}
contents->postscript.reset(new proto::PostScript(tail.postscript()));
contents->footer.reset(new proto::Footer(tail.footer()));
fileLength = tail.filelength();
postscriptLength = tail.postscriptlength();
} else {
// figure out the size of the file using the option or filesystem
fileLength = std::min(options.getTailLocation(),
static_cast<uint64_t>(stream->getLength()));
//read last bytes into buffer to get PostScript
uint64_t readSize = std::min(fileLength, DIRECTORY_SIZE_GUESS);
if (readSize < 4) {
throw ParseError("File size too small");
}
std::unique_ptr<DataBuffer<char>> buffer( new DataBuffer<char>(*contents->pool, readSize) );
stream->read(buffer->data(), readSize, fileLength - readSize);
postscriptLength = buffer->data()[readSize - 1] & 0xff;
contents->postscript = REDUNDANT_MOVE(readPostscript(stream.get(),
buffer.get(), postscriptLength));
uint64_t footerSize = contents->postscript->footerlength();
uint64_t tailSize = 1 + postscriptLength + footerSize;
if (tailSize >= fileLength) {
std::stringstream msg;
msg << "Invalid ORC tailSize=" << tailSize << ", fileLength=" << fileLength;
throw ParseError(msg.str());
}
uint64_t footerOffset;
if (tailSize > readSize) {
buffer->resize(footerSize);
stream->read(buffer->data(), footerSize, fileLength - tailSize);
footerOffset = 0;
} else {
footerOffset = readSize - tailSize;
}
contents->footer = REDUNDANT_MOVE(readFooter(stream.get(), buffer.get(),
footerOffset, *contents->postscript, *contents->pool));
}
contents->stream = std::move(stream);
return std::unique_ptr<Reader>(new ReaderImpl(std::move(contents),
options,
fileLength,
postscriptLength));
}
std::map<uint32_t, BloomFilterIndex>
ReaderImpl::getBloomFilters(uint32_t stripeIndex,
const std::set<uint32_t>& included) const {
std::map<uint32_t, BloomFilterIndex> ret;
// find stripe info
if (stripeIndex >= static_cast<uint32_t>(footer->stripes_size())) {
throw std::logic_error("Illegal stripe index: " + to_string(static_cast<int64_t>(stripeIndex)));
}
const proto::StripeInformation currentStripeInfo =
footer->stripes(static_cast<int>(stripeIndex));
const proto::StripeFooter currentStripeFooter =
getStripeFooter(currentStripeInfo, *contents);
// iterate stripe footer to get stream of bloomfilter
uint64_t offset = static_cast<uint64_t>(currentStripeInfo.offset());
for (int i = 0; i < currentStripeFooter.streams_size(); i++) {
const proto::Stream& stream = currentStripeFooter.streams(i);
uint32_t column = static_cast<uint32_t>(stream.column());
uint64_t length = static_cast<uint64_t>(stream.length());
// a bloom filter stream from a selected column is found
if (stream.kind() == proto::Stream_Kind_BLOOM_FILTER_UTF8 &&
(included.empty() || included.find(column) != included.end())) {
std::unique_ptr<SeekableInputStream> pbStream =
createDecompressor(contents->compression,
std::unique_ptr<SeekableInputStream>
(new SeekableFileInputStream(contents->stream.get(),
offset,
length,
*contents->pool)),
contents->blockSize,
*(contents->pool));
proto::BloomFilterIndex pbBFIndex;
if (!pbBFIndex.ParseFromZeroCopyStream(pbStream.get())) {
throw ParseError("Failed to parse BloomFilterIndex");
}
BloomFilterIndex bfIndex;
for (int j = 0; j < pbBFIndex.bloomfilter_size(); j++) {
std::unique_ptr<BloomFilter> entry = BloomFilterUTF8Utils::deserialize(
stream.kind(),
currentStripeFooter.columns(static_cast<int>(stream.column())),
pbBFIndex.bloomfilter(j));
bfIndex.entries.push_back(std::shared_ptr<BloomFilter>(std::move(entry)));
}
// add bloom filters to result for one column
ret[column] = bfIndex;
}
offset += length;
}
return ret;
}
RowReader::~RowReader() {
// PASS
}
Reader::~Reader() {
// PASS
}
InputStream::~InputStream() {
// PASS
};
}// namespace