| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "Adaptor.hh" |
| #include "BloomFilter.hh" |
| #include "Options.hh" |
| #include "Reader.hh" |
| #include "Statistics.hh" |
| #include "StripeStream.hh" |
| |
| #include "wrap/coded-stream-wrapper.h" |
| |
| #include <algorithm> |
| #include <iostream> |
| #include <memory> |
| #include <sstream> |
| #include <string> |
| #include <vector> |
| #include <iterator> |
| #include <set> |
| |
| namespace orc { |
| |
| const WriterVersionImpl &WriterVersionImpl::VERSION_HIVE_8732() { |
| static const WriterVersionImpl version(WriterVersion_HIVE_8732); |
| return version; |
| } |
| |
| uint64_t getCompressionBlockSize(const proto::PostScript& ps) { |
| if (ps.has_compressionblocksize()) { |
| return ps.compressionblocksize(); |
| } else { |
| return 256 * 1024; |
| } |
| } |
| |
| CompressionKind convertCompressionKind(const proto::PostScript& ps) { |
| if (ps.has_compression()) { |
| return static_cast<CompressionKind>(ps.compression()); |
| } else { |
| throw ParseError("Unknown compression type"); |
| } |
| } |
| |
| std::string ColumnSelector::toDotColumnPath() { |
| if (columns.empty()) { |
| return std::string(); |
| } |
| std::ostringstream columnStream; |
| std::copy(columns.begin(), columns.end(), |
| std::ostream_iterator<std::string>(columnStream, ".")); |
| std::string columnPath = columnStream.str(); |
| return columnPath.substr(0, columnPath.length() - 1); |
| } |
| |
| |
| void ColumnSelector::selectChildren(std::vector<bool>& selectedColumns, const Type& type) { |
| size_t id = static_cast<size_t>(type.getColumnId()); |
| if (!selectedColumns[id]) { |
| selectedColumns[id] = true; |
| for(size_t c = id; c <= type.getMaximumColumnId(); ++c){ |
| selectedColumns[c] = true; |
| } |
| } |
| } |
| |
| /** |
| * Recurses over a type tree and selects the parents of every selected type. |
| * @return true if any child was selected. |
| */ |
| bool ColumnSelector::selectParents(std::vector<bool>& selectedColumns, const Type& type) { |
| size_t id = static_cast<size_t>(type.getColumnId()); |
| bool result = selectedColumns[id]; |
| for(uint64_t c=0; c < type.getSubtypeCount(); ++c) { |
| result |= selectParents(selectedColumns, *type.getSubtype(c)); |
| } |
| selectedColumns[id] = result; |
| return result; |
| } |
| |
| /** |
| * Recurses over a type tree and build two maps |
| * map<TypeName, TypeId>, map<TypeId, Type> |
| */ |
| void ColumnSelector::buildTypeNameIdMap(const Type* type) { |
| // map<type_id, Type*> |
| idTypeMap[type->getColumnId()] = type; |
| |
| if (STRUCT == type->getKind()) { |
| for (size_t i = 0; i < type->getSubtypeCount(); ++i) { |
| const std::string& fieldName = type->getFieldName(i); |
| columns.push_back(fieldName); |
| nameIdMap[toDotColumnPath()] = type->getSubtype(i)->getColumnId(); |
| buildTypeNameIdMap(type->getSubtype(i)); |
| columns.pop_back(); |
| } |
| } else { |
| // other non-primitive type |
| for (size_t j = 0; j < type->getSubtypeCount(); ++j) { |
| buildTypeNameIdMap(type->getSubtype(j)); |
| } |
| } |
| } |
| |
| void ColumnSelector::updateSelected(std::vector<bool>& selectedColumns, |
| const RowReaderOptions& options) { |
| selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), false); |
| if (contents->schema->getKind() == STRUCT && options.getIndexesSet()) { |
| for(std::list<uint64_t>::const_iterator field = options.getInclude().begin(); |
| field != options.getInclude().end(); ++field) { |
| updateSelectedByFieldId(selectedColumns, *field); |
| } |
| } else if (contents->schema->getKind() == STRUCT && options.getNamesSet()) { |
| for(std::list<std::string>::const_iterator field = options.getIncludeNames().begin(); |
| field != options.getIncludeNames().end(); ++field) { |
| updateSelectedByName(selectedColumns, *field); |
| } |
| } else if (options.getTypeIdsSet()) { |
| for(std::list<uint64_t>::const_iterator typeId = options.getInclude().begin(); |
| typeId != options.getInclude().end(); ++typeId) { |
| updateSelectedByTypeId(selectedColumns, *typeId); |
| } |
| } else { |
| // default is to select all columns |
| std::fill(selectedColumns.begin(), selectedColumns.end(), true); |
| } |
| selectParents(selectedColumns, *contents->schema.get()); |
| selectedColumns[0] = true; // column 0 is selected by default |
| } |
| |
| void ColumnSelector::updateSelectedByFieldId(std::vector<bool>& selectedColumns, |
| uint64_t fieldId) { |
| if (fieldId < contents->schema->getSubtypeCount()) { |
| selectChildren(selectedColumns, *contents->schema->getSubtype(fieldId)); |
| } else { |
| std::stringstream buffer; |
| buffer << "Invalid column selected " << fieldId << " out of " |
| << contents->schema->getSubtypeCount(); |
| throw ParseError(buffer.str()); |
| } |
| } |
| |
| void ColumnSelector::updateSelectedByTypeId(std::vector<bool>& selectedColumns, uint64_t typeId) { |
| if (typeId < selectedColumns.size()) { |
| const Type& type = *idTypeMap[typeId]; |
| selectChildren(selectedColumns, type); |
| } else { |
| std::stringstream buffer; |
| buffer << "Invalid type id selected " << typeId << " out of " |
| << selectedColumns.size(); |
| throw ParseError(buffer.str()); |
| } |
| } |
| |
| void ColumnSelector::updateSelectedByName(std::vector<bool>& selectedColumns, |
| const std::string& fieldName) { |
| std::map<std::string, uint64_t>::const_iterator ite = nameIdMap.find(fieldName); |
| if (ite != nameIdMap.end()) { |
| updateSelectedByTypeId(selectedColumns, ite->second); |
| } else { |
| throw ParseError("Invalid column selected " + fieldName); |
| } |
| } |
| |
| ColumnSelector::ColumnSelector(const FileContents* _contents): contents(_contents) { |
| buildTypeNameIdMap(contents->schema.get()); |
| } |
| |
| RowReaderImpl::RowReaderImpl(std::shared_ptr<FileContents> _contents, |
| const RowReaderOptions& opts |
| ): localTimezone(getLocalTimezone()), |
| contents(_contents), |
| throwOnHive11DecimalOverflow(opts.getThrowOnHive11DecimalOverflow()), |
| forcedScaleOnHive11Decimal(opts.getForcedScaleOnHive11Decimal()), |
| footer(contents->footer.get()), |
| firstRowOfStripe(*contents->pool, 0), |
| enableEncodedBlock(opts.getEnableLazyDecoding()) { |
| uint64_t numberOfStripes; |
| numberOfStripes = static_cast<uint64_t>(footer->stripes_size()); |
| currentStripe = numberOfStripes; |
| lastStripe = 0; |
| currentRowInStripe = 0; |
| rowsInCurrentStripe = 0; |
| uint64_t rowTotal = 0; |
| |
| firstRowOfStripe.resize(numberOfStripes); |
| for(size_t i=0; i < numberOfStripes; ++i) { |
| firstRowOfStripe[i] = rowTotal; |
| proto::StripeInformation stripeInfo = |
| footer->stripes(static_cast<int>(i)); |
| rowTotal += stripeInfo.numberofrows(); |
| bool isStripeInRange = stripeInfo.offset() >= opts.getOffset() && |
| stripeInfo.offset() < opts.getOffset() + opts.getLength(); |
| if (isStripeInRange) { |
| if (i < currentStripe) { |
| currentStripe = i; |
| } |
| if (i >= lastStripe) { |
| lastStripe = i + 1; |
| } |
| } |
| } |
| firstStripe = currentStripe; |
| |
| if (currentStripe == 0) { |
| previousRow = (std::numeric_limits<uint64_t>::max)(); |
| } else if (currentStripe == numberOfStripes) { |
| previousRow = footer->numberofrows(); |
| } else { |
| previousRow = firstRowOfStripe[firstStripe]-1; |
| } |
| |
| ColumnSelector column_selector(contents.get()); |
| column_selector.updateSelected(selectedColumns, opts); |
| } |
| |
| CompressionKind RowReaderImpl::getCompression() const { |
| return contents->compression; |
| } |
| |
| uint64_t RowReaderImpl::getCompressionSize() const { |
| return contents->blockSize; |
| } |
| |
| const std::vector<bool> RowReaderImpl::getSelectedColumns() const { |
| return selectedColumns; |
| } |
| |
| const Type& RowReaderImpl::getSelectedType() const { |
| if (selectedSchema.get() == nullptr) { |
| selectedSchema = buildSelectedType(contents->schema.get(), |
| selectedColumns); |
| } |
| return *(selectedSchema.get()); |
| } |
| |
| uint64_t RowReaderImpl::getRowNumber() const { |
| return previousRow; |
| } |
| |
| void RowReaderImpl::seekToRow(uint64_t rowNumber) { |
| // Empty file |
| if (lastStripe == 0) { |
| return; |
| } |
| |
| // If we are reading only a portion of the file |
| // (bounded by firstStripe and lastStripe), |
| // seeking before or after the portion of interest should return no data. |
| // Implement this by setting previousRow to the number of rows in the file. |
| |
| // seeking past lastStripe |
| uint64_t num_stripes = static_cast<uint64_t>(footer->stripes_size()); |
| if ( (lastStripe == num_stripes |
| && rowNumber >= footer->numberofrows()) || |
| (lastStripe < num_stripes |
| && rowNumber >= firstRowOfStripe[lastStripe]) ) { |
| currentStripe = num_stripes; |
| previousRow = footer->numberofrows(); |
| return; |
| } |
| |
| uint64_t seekToStripe = 0; |
| while (seekToStripe+1 < lastStripe && |
| firstRowOfStripe[seekToStripe+1] <= rowNumber) { |
| seekToStripe++; |
| } |
| |
| // seeking before the first stripe |
| if (seekToStripe < firstStripe) { |
| currentStripe = num_stripes; |
| previousRow = footer->numberofrows(); |
| return; |
| } |
| |
| currentStripe = seekToStripe; |
| currentRowInStripe = rowNumber - firstRowOfStripe[currentStripe]; |
| previousRow = rowNumber; |
| startNextStripe(); |
| |
| uint64_t rowsToSkip = currentRowInStripe; |
| |
| if (footer->rowindexstride() > 0 && |
| currentStripeInfo.indexlength() > 0) { |
| uint32_t rowGroupId = |
| static_cast<uint32_t>(currentRowInStripe / footer->rowindexstride()); |
| rowsToSkip -= static_cast<uint64_t>(rowGroupId) * footer->rowindexstride(); |
| |
| if (rowGroupId != 0) { |
| seekToRowGroup(rowGroupId); |
| } |
| } |
| |
| reader->skip(rowsToSkip); |
| } |
| |
| void RowReaderImpl::seekToRowGroup(uint32_t rowGroupEntryId) { |
| // reset all previous row indexes |
| rowIndexes.clear(); |
| |
| // obtain row indexes for selected columns |
| uint64_t offset = currentStripeInfo.offset(); |
| for (int i = 0; i < currentStripeFooter.streams_size(); ++i) { |
| const proto::Stream& pbStream = currentStripeFooter.streams(i); |
| uint64_t colId = pbStream.column(); |
| if (selectedColumns[colId] && pbStream.has_kind() |
| && pbStream.kind() == proto::Stream_Kind_ROW_INDEX) { |
| std::unique_ptr<SeekableInputStream> inStream = |
| createDecompressor(getCompression(), |
| std::unique_ptr<SeekableInputStream> |
| (new SeekableFileInputStream |
| (contents->stream.get(), |
| offset, |
| pbStream.length(), |
| *contents->pool)), |
| getCompressionSize(), |
| *contents->pool); |
| |
| proto::RowIndex rowIndex; |
| if (!rowIndex.ParseFromZeroCopyStream(inStream.get())) { |
| throw ParseError("Failed to parse the row index"); |
| } |
| |
| rowIndexes[colId] = rowIndex; |
| } |
| offset += pbStream.length(); |
| } |
| |
| // store positions for selected columns |
| std::vector<std::list<uint64_t>> positions; |
| // store position providers for selected colimns |
| std::unordered_map<uint64_t, PositionProvider> positionProviders; |
| |
| for (auto rowIndex = rowIndexes.cbegin(); |
| rowIndex != rowIndexes.cend(); ++rowIndex) { |
| uint64_t colId = rowIndex->first; |
| const proto::RowIndexEntry& entry = |
| rowIndex->second.entry(static_cast<int32_t>(rowGroupEntryId)); |
| |
| // copy index positions for a specific column |
| positions.push_back({}); |
| auto& position = positions.back(); |
| for (int pos = 0; pos != entry.positions_size(); ++pos) { |
| position.push_back(entry.positions(pos)); |
| } |
| positionProviders.insert(std::make_pair(colId, PositionProvider(position))); |
| } |
| |
| reader->seekToRowGroup(positionProviders); |
| } |
| |
| const FileContents& RowReaderImpl::getFileContents() const { |
| return *contents; |
| } |
| |
| bool RowReaderImpl::getThrowOnHive11DecimalOverflow() const { |
| return throwOnHive11DecimalOverflow; |
| } |
| |
| int32_t RowReaderImpl::getForcedScaleOnHive11Decimal() const { |
| return forcedScaleOnHive11Decimal; |
| } |
| |
| proto::StripeFooter getStripeFooter(const proto::StripeInformation& info, |
| const FileContents& contents) { |
| uint64_t stripeFooterStart = info.offset() + info.indexlength() + |
| info.datalength(); |
| uint64_t stripeFooterLength = info.footerlength(); |
| std::unique_ptr<SeekableInputStream> pbStream = |
| createDecompressor(contents.compression, |
| std::unique_ptr<SeekableInputStream> |
| (new SeekableFileInputStream(contents.stream.get(), |
| stripeFooterStart, |
| stripeFooterLength, |
| *contents.pool)), |
| contents.blockSize, |
| *contents.pool); |
| proto::StripeFooter result; |
| if (!result.ParseFromZeroCopyStream(pbStream.get())) { |
| throw ParseError(std::string("bad StripeFooter from ") + |
| pbStream->getName()); |
| } |
| // Verify StripeFooter in case it's corrupt |
| if (result.columns_size() != contents.footer->types_size()) { |
| std::stringstream msg; |
| msg << "bad number of ColumnEncodings in StripeFooter: expected=" |
| << contents.footer->types_size() << ", actual=" << result.columns_size(); |
| throw ParseError(msg.str()); |
| } |
| return result; |
| } |
| |
| ReaderImpl::ReaderImpl(std::shared_ptr<FileContents> _contents, |
| const ReaderOptions& opts, |
| uint64_t _fileLength, |
| uint64_t _postscriptLength |
| ): contents(std::move(_contents)), |
| options(opts), |
| fileLength(_fileLength), |
| postscriptLength(_postscriptLength), |
| footer(contents->footer.get()) { |
| isMetadataLoaded = false; |
| checkOrcVersion(); |
| numberOfStripes = static_cast<uint64_t>(footer->stripes_size()); |
| contents->schema = REDUNDANT_MOVE(convertType(footer->types(0), *footer)); |
| contents->blockSize = getCompressionBlockSize(*contents->postscript); |
| contents->compression= convertCompressionKind(*contents->postscript); |
| } |
| |
| std::string ReaderImpl::getSerializedFileTail() const { |
| proto::FileTail tail; |
| proto::PostScript *mutable_ps = tail.mutable_postscript(); |
| mutable_ps->CopyFrom(*contents->postscript); |
| proto::Footer *mutableFooter = tail.mutable_footer(); |
| mutableFooter->CopyFrom(*footer); |
| tail.set_filelength(fileLength); |
| tail.set_postscriptlength(postscriptLength); |
| std::string result; |
| if (!tail.SerializeToString(&result)) { |
| throw ParseError("Failed to serialize file tail"); |
| } |
| return result; |
| } |
| |
| const ReaderOptions& ReaderImpl::getReaderOptions() const { |
| return options; |
| } |
| |
| CompressionKind ReaderImpl::getCompression() const { |
| return contents->compression; |
| } |
| |
| uint64_t ReaderImpl::getCompressionSize() const { |
| return contents->blockSize; |
| } |
| |
| uint64_t ReaderImpl::getNumberOfStripes() const { |
| return numberOfStripes; |
| } |
| |
| uint64_t ReaderImpl::getNumberOfStripeStatistics() const { |
| if (!isMetadataLoaded) { |
| readMetadata(); |
| } |
| return metadata.get() == nullptr ? 0 : |
| static_cast<uint64_t>(metadata->stripestats_size()); |
| } |
| |
| std::unique_ptr<StripeInformation> |
| ReaderImpl::getStripe(uint64_t stripeIndex) const { |
| if (stripeIndex > getNumberOfStripes()) { |
| throw std::logic_error("stripe index out of range"); |
| } |
| proto::StripeInformation stripeInfo = |
| footer->stripes(static_cast<int>(stripeIndex)); |
| |
| return std::unique_ptr<StripeInformation> |
| (new StripeInformationImpl |
| (stripeInfo.offset(), |
| stripeInfo.indexlength(), |
| stripeInfo.datalength(), |
| stripeInfo.footerlength(), |
| stripeInfo.numberofrows(), |
| contents->stream.get(), |
| *contents->pool, |
| contents->compression, |
| contents->blockSize)); |
| } |
| |
| FileVersion ReaderImpl::getFormatVersion() const { |
| if (contents->postscript->version_size() != 2) { |
| return FileVersion::v_0_11(); |
| } |
| return FileVersion( |
| contents->postscript->version(0), |
| contents->postscript->version(1)); |
| } |
| |
| uint64_t ReaderImpl::getNumberOfRows() const { |
| return footer->numberofrows(); |
| } |
| |
| WriterId ReaderImpl::getWriterId() const { |
| if (footer->has_writer()) { |
| uint32_t id = footer->writer(); |
| if (id > WriterId::PRESTO_WRITER) { |
| return WriterId::UNKNOWN_WRITER; |
| } else { |
| return static_cast<WriterId>(id); |
| } |
| } |
| return WriterId::ORC_JAVA_WRITER; |
| } |
| |
| uint32_t ReaderImpl::getWriterIdValue() const { |
| if (footer->has_writer()) { |
| return footer->writer(); |
| } else { |
| return WriterId::ORC_JAVA_WRITER; |
| } |
| } |
| |
| WriterVersion ReaderImpl::getWriterVersion() const { |
| if (!contents->postscript->has_writerversion()) { |
| return WriterVersion_ORIGINAL; |
| } |
| return static_cast<WriterVersion>(contents->postscript->writerversion()); |
| } |
| |
| uint64_t ReaderImpl::getContentLength() const { |
| return footer->contentlength(); |
| } |
| |
| uint64_t ReaderImpl::getStripeStatisticsLength() const { |
| return contents->postscript->metadatalength(); |
| } |
| |
| uint64_t ReaderImpl::getFileFooterLength() const { |
| return contents->postscript->footerlength(); |
| } |
| |
| uint64_t ReaderImpl::getFilePostscriptLength() const { |
| return postscriptLength; |
| } |
| |
| uint64_t ReaderImpl::getFileLength() const { |
| return fileLength; |
| } |
| |
| uint64_t ReaderImpl::getRowIndexStride() const { |
| return footer->rowindexstride(); |
| } |
| |
| const std::string& ReaderImpl::getStreamName() const { |
| return contents->stream->getName(); |
| } |
| |
| std::list<std::string> ReaderImpl::getMetadataKeys() const { |
| std::list<std::string> result; |
| for(int i=0; i < footer->metadata_size(); ++i) { |
| result.push_back(footer->metadata(i).name()); |
| } |
| return result; |
| } |
| |
| std::string ReaderImpl::getMetadataValue(const std::string& key) const { |
| for(int i=0; i < footer->metadata_size(); ++i) { |
| if (footer->metadata(i).name() == key) { |
| return footer->metadata(i).value(); |
| } |
| } |
| throw std::range_error("key not found"); |
| } |
| |
| void ReaderImpl::getRowIndexStatistics(const proto::StripeInformation& stripeInfo, |
| uint64_t stripeIndex, const proto::StripeFooter& currentStripeFooter, |
| std::vector<std::vector<proto::ColumnStatistics> >* indexStats) const { |
| int num_streams = currentStripeFooter.streams_size(); |
| uint64_t offset = stripeInfo.offset(); |
| uint64_t indexEnd = stripeInfo.offset() + stripeInfo.indexlength(); |
| for (int i = 0; i < num_streams; i++) { |
| const proto::Stream& stream = currentStripeFooter.streams(i); |
| StreamKind streamKind = static_cast<StreamKind>(stream.kind()); |
| uint64_t length = static_cast<uint64_t>(stream.length()); |
| if (streamKind == StreamKind::StreamKind_ROW_INDEX) { |
| if (offset + length > indexEnd) { |
| std::stringstream msg; |
| msg << "Malformed RowIndex stream meta in stripe " << stripeIndex |
| << ": streamOffset=" << offset << ", streamLength=" << length |
| << ", stripeOffset=" << stripeInfo.offset() << ", stripeIndexLength=" |
| << stripeInfo.indexlength(); |
| throw ParseError(msg.str()); |
| } |
| std::unique_ptr<SeekableInputStream> pbStream = |
| createDecompressor(contents->compression, |
| std::unique_ptr<SeekableInputStream> |
| (new SeekableFileInputStream(contents->stream.get(), |
| offset, |
| length, |
| *contents->pool)), |
| contents->blockSize, |
| *(contents->pool)); |
| |
| proto::RowIndex rowIndex; |
| if (!rowIndex.ParseFromZeroCopyStream(pbStream.get())) { |
| throw ParseError("Failed to parse RowIndex from stripe footer"); |
| } |
| int num_entries = rowIndex.entry_size(); |
| size_t column = static_cast<size_t>(stream.column()); |
| for (int j = 0; j < num_entries; j++) { |
| const proto::RowIndexEntry& entry = rowIndex.entry(j); |
| (*indexStats)[column].push_back(entry.statistics()); |
| } |
| } |
| offset += length; |
| } |
| } |
| |
| bool ReaderImpl::hasMetadataValue(const std::string& key) const { |
| for(int i=0; i < footer->metadata_size(); ++i) { |
| if (footer->metadata(i).name() == key) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| const Type& ReaderImpl::getType() const { |
| return *(contents->schema.get()); |
| } |
| |
| std::unique_ptr<StripeStatistics> |
| ReaderImpl::getStripeStatistics(uint64_t stripeIndex) const { |
| if (!isMetadataLoaded) { |
| readMetadata(); |
| } |
| if (metadata.get() == nullptr) { |
| throw std::logic_error("No stripe statistics in file"); |
| } |
| size_t num_cols = static_cast<size_t>( |
| metadata->stripestats( |
| static_cast<int>(stripeIndex)).colstats_size()); |
| std::vector<std::vector<proto::ColumnStatistics> > indexStats(num_cols); |
| |
| proto::StripeInformation currentStripeInfo = |
| footer->stripes(static_cast<int>(stripeIndex)); |
| proto::StripeFooter currentStripeFooter = |
| getStripeFooter(currentStripeInfo, *contents.get()); |
| |
| getRowIndexStatistics(currentStripeInfo, stripeIndex, currentStripeFooter, &indexStats); |
| |
| const Timezone& writerTZ = |
| currentStripeFooter.has_writertimezone() ? |
| getTimezoneByName(currentStripeFooter.writertimezone()) : |
| getLocalTimezone(); |
| StatContext statContext(hasCorrectStatistics(), &writerTZ); |
| return std::unique_ptr<StripeStatistics> |
| (new StripeStatisticsImpl(metadata->stripestats(static_cast<int>(stripeIndex)), |
| indexStats, statContext)); |
| } |
| |
| std::unique_ptr<Statistics> ReaderImpl::getStatistics() const { |
| StatContext statContext(hasCorrectStatistics()); |
| return std::unique_ptr<Statistics> |
| (new StatisticsImpl(*footer, statContext)); |
| } |
| |
| std::unique_ptr<ColumnStatistics> |
| ReaderImpl::getColumnStatistics(uint32_t index) const { |
| if (index >= static_cast<uint64_t>(footer->statistics_size())) { |
| throw std::logic_error("column index out of range"); |
| } |
| proto::ColumnStatistics col = |
| footer->statistics(static_cast<int32_t>(index)); |
| |
| StatContext statContext(hasCorrectStatistics()); |
| return std::unique_ptr<ColumnStatistics> (convertColumnStatistics(col, statContext)); |
| } |
| |
| void ReaderImpl::readMetadata() const { |
| uint64_t metadataSize = contents->postscript->metadatalength(); |
| uint64_t footerLength = contents->postscript->footerlength(); |
| if (fileLength < metadataSize + footerLength + postscriptLength + 1) { |
| std::stringstream msg; |
| msg << "Invalid Metadata length: fileLength=" << fileLength |
| << ", metadataLength=" << metadataSize << ", footerLength=" << footerLength |
| << ", postscriptLength=" << postscriptLength; |
| throw ParseError(msg.str()); |
| } |
| uint64_t metadataStart = fileLength - metadataSize - footerLength - postscriptLength - 1; |
| if (metadataSize != 0) { |
| std::unique_ptr<SeekableInputStream> pbStream = |
| createDecompressor(contents->compression, |
| std::unique_ptr<SeekableInputStream> |
| (new SeekableFileInputStream(contents->stream.get(), |
| metadataStart, |
| metadataSize, |
| *contents->pool)), |
| contents->blockSize, |
| *contents->pool); |
| metadata.reset(new proto::Metadata()); |
| if (!metadata->ParseFromZeroCopyStream(pbStream.get())) { |
| throw ParseError("Failed to parse the metadata"); |
| } |
| } |
| isMetadataLoaded = true; |
| } |
| |
| bool ReaderImpl::hasCorrectStatistics() const { |
| return !WriterVersionImpl::VERSION_HIVE_8732().compareGT(getWriterVersion()); |
| } |
| |
| void ReaderImpl::checkOrcVersion() { |
| FileVersion version = getFormatVersion(); |
| if (version != FileVersion(0, 11) && version != FileVersion(0, 12)) { |
| *(options.getErrorStream()) |
| << "Warning: ORC file " << contents->stream->getName() |
| << " was written in an unknown format version " |
| << version.toString() << "\n"; |
| } |
| } |
| |
| std::unique_ptr<RowReader> ReaderImpl::createRowReader() const { |
| RowReaderOptions defaultOpts; |
| return createRowReader(defaultOpts); |
| } |
| |
| std::unique_ptr<RowReader> ReaderImpl::createRowReader( |
| const RowReaderOptions& opts) const { |
| return std::unique_ptr<RowReader>(new RowReaderImpl(contents, opts)); |
| } |
| |
| uint64_t maxStreamsForType(const proto::Type& type) { |
| switch (static_cast<int64_t>(type.kind())) { |
| case proto::Type_Kind_STRUCT: |
| return 1; |
| case proto::Type_Kind_INT: |
| case proto::Type_Kind_LONG: |
| case proto::Type_Kind_SHORT: |
| case proto::Type_Kind_FLOAT: |
| case proto::Type_Kind_DOUBLE: |
| case proto::Type_Kind_BOOLEAN: |
| case proto::Type_Kind_BYTE: |
| case proto::Type_Kind_DATE: |
| case proto::Type_Kind_LIST: |
| case proto::Type_Kind_MAP: |
| case proto::Type_Kind_UNION: |
| return 2; |
| case proto::Type_Kind_BINARY: |
| case proto::Type_Kind_DECIMAL: |
| case proto::Type_Kind_TIMESTAMP: |
| return 3; |
| case proto::Type_Kind_CHAR: |
| case proto::Type_Kind_STRING: |
| case proto::Type_Kind_VARCHAR: |
| return 4; |
| default: |
| return 0; |
| } |
| } |
| |
| uint64_t ReaderImpl::getMemoryUse(int stripeIx) { |
| std::vector<bool> selectedColumns; |
| selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), true); |
| return getMemoryUse(stripeIx, selectedColumns); |
| } |
| |
| uint64_t ReaderImpl::getMemoryUseByFieldId(const std::list<uint64_t>& include, int stripeIx) { |
| std::vector<bool> selectedColumns; |
| selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), false); |
| ColumnSelector column_selector(contents.get()); |
| if (contents->schema->getKind() == STRUCT && include.begin() != include.end()) { |
| for(std::list<uint64_t>::const_iterator field = include.begin(); |
| field != include.end(); ++field) { |
| column_selector.updateSelectedByFieldId(selectedColumns, *field); |
| } |
| } else { |
| // default is to select all columns |
| std::fill(selectedColumns.begin(), selectedColumns.end(), true); |
| } |
| column_selector.selectParents(selectedColumns, *contents->schema.get()); |
| selectedColumns[0] = true; // column 0 is selected by default |
| return getMemoryUse(stripeIx, selectedColumns); |
| } |
| |
| uint64_t ReaderImpl::getMemoryUseByName(const std::list<std::string>& names, int stripeIx) { |
| std::vector<bool> selectedColumns; |
| selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), false); |
| ColumnSelector column_selector(contents.get()); |
| if (contents->schema->getKind() == STRUCT && names.begin() != names.end()) { |
| for(std::list<std::string>::const_iterator field = names.begin(); |
| field != names.end(); ++field) { |
| column_selector.updateSelectedByName(selectedColumns, *field); |
| } |
| } else { |
| // default is to select all columns |
| std::fill(selectedColumns.begin(), selectedColumns.end(), true); |
| } |
| column_selector.selectParents(selectedColumns, *contents->schema.get()); |
| selectedColumns[0] = true; // column 0 is selected by default |
| return getMemoryUse(stripeIx, selectedColumns); |
| } |
| |
| uint64_t ReaderImpl::getMemoryUseByTypeId(const std::list<uint64_t>& include, int stripeIx) { |
| std::vector<bool> selectedColumns; |
| selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), false); |
| ColumnSelector column_selector(contents.get()); |
| if (include.begin() != include.end()) { |
| for(std::list<uint64_t>::const_iterator field = include.begin(); |
| field != include.end(); ++field) { |
| column_selector.updateSelectedByTypeId(selectedColumns, *field); |
| } |
| } else { |
| // default is to select all columns |
| std::fill(selectedColumns.begin(), selectedColumns.end(), true); |
| } |
| column_selector.selectParents(selectedColumns, *contents->schema.get()); |
| selectedColumns[0] = true; // column 0 is selected by default |
| return getMemoryUse(stripeIx, selectedColumns); |
| } |
| |
| uint64_t ReaderImpl::getMemoryUse(int stripeIx, std::vector<bool>& selectedColumns) { |
| uint64_t maxDataLength = 0; |
| |
| if (stripeIx >= 0 && stripeIx < footer->stripes_size()) { |
| uint64_t stripe = footer->stripes(stripeIx).datalength(); |
| if (maxDataLength < stripe) { |
| maxDataLength = stripe; |
| } |
| } else { |
| for (int i=0; i < footer->stripes_size(); i++) { |
| uint64_t stripe = footer->stripes(i).datalength(); |
| if (maxDataLength < stripe) { |
| maxDataLength = stripe; |
| } |
| } |
| } |
| |
| bool hasStringColumn = false; |
| uint64_t nSelectedStreams = 0; |
| for (int i=0; !hasStringColumn && i < footer->types_size(); i++) { |
| if (selectedColumns[static_cast<size_t>(i)]) { |
| const proto::Type& type = footer->types(i); |
| nSelectedStreams += maxStreamsForType(type) ; |
| switch (static_cast<int64_t>(type.kind())) { |
| case proto::Type_Kind_CHAR: |
| case proto::Type_Kind_STRING: |
| case proto::Type_Kind_VARCHAR: |
| case proto::Type_Kind_BINARY: { |
| hasStringColumn = true; |
| break; |
| } |
| default: { |
| break; |
| } |
| } |
| } |
| } |
| |
| /* If a string column is read, use stripe datalength as a memory estimate |
| * because we don't know the dictionary size. Multiply by 2 because |
| * a string column requires two buffers: |
| * in the input stream and in the seekable input stream. |
| * If no string column is read, estimate from the number of streams. |
| */ |
| uint64_t memory = hasStringColumn ? 2 * maxDataLength : |
| std::min(uint64_t(maxDataLength), |
| nSelectedStreams * contents->stream->getNaturalReadSize()); |
| |
| // Do we need even more memory to read the footer or the metadata? |
| if (memory < contents->postscript->footerlength() + DIRECTORY_SIZE_GUESS) { |
| memory = contents->postscript->footerlength() + DIRECTORY_SIZE_GUESS; |
| } |
| if (memory < contents->postscript->metadatalength()) { |
| memory = contents->postscript->metadatalength(); |
| } |
| |
| // Account for firstRowOfStripe. |
| memory += static_cast<uint64_t>(footer->stripes_size()) * sizeof(uint64_t); |
| |
| // Decompressors need buffers for each stream |
| uint64_t decompressorMemory = 0; |
| if (contents->compression != CompressionKind_NONE) { |
| for (int i=0; i < footer->types_size(); i++) { |
| if (selectedColumns[static_cast<size_t>(i)]) { |
| const proto::Type& type = footer->types(i); |
| decompressorMemory += maxStreamsForType(type) * contents->blockSize; |
| } |
| } |
| if (contents->compression == CompressionKind_SNAPPY) { |
| decompressorMemory *= 2; // Snappy decompressor uses a second buffer |
| } |
| } |
| |
| return memory + decompressorMemory ; |
| } |
| |
| void RowReaderImpl::startNextStripe() { |
| reader.reset(); // ColumnReaders use lots of memory; free old memory first |
| currentStripeInfo = footer->stripes(static_cast<int>(currentStripe)); |
| uint64_t fileLength = contents->stream->getLength(); |
| if (currentStripeInfo.offset() + currentStripeInfo.indexlength() + |
| currentStripeInfo.datalength() + currentStripeInfo.footerlength() >= fileLength) { |
| std::stringstream msg; |
| msg << "Malformed StripeInformation at stripe index " << currentStripe << ": fileLength=" |
| << fileLength << ", StripeInfo=(offset=" << currentStripeInfo.offset() << ", indexLength=" |
| << currentStripeInfo.indexlength() << ", dataLength=" << currentStripeInfo.datalength() |
| << ", footerLength=" << currentStripeInfo.footerlength() << ")"; |
| throw ParseError(msg.str()); |
| } |
| currentStripeFooter = getStripeFooter(currentStripeInfo, *contents.get()); |
| rowsInCurrentStripe = currentStripeInfo.numberofrows(); |
| const Timezone& writerTimezone = |
| currentStripeFooter.has_writertimezone() ? |
| getTimezoneByName(currentStripeFooter.writertimezone()) : |
| localTimezone; |
| StripeStreamsImpl stripeStreams(*this, currentStripe, currentStripeInfo, |
| currentStripeFooter, |
| currentStripeInfo.offset(), |
| *(contents->stream.get()), |
| writerTimezone); |
| reader = buildReader(*contents->schema.get(), stripeStreams); |
| } |
| |
| bool RowReaderImpl::next(ColumnVectorBatch& data) { |
| if (currentStripe >= lastStripe) { |
| data.numElements = 0; |
| if (lastStripe > 0) { |
| previousRow = firstRowOfStripe[lastStripe - 1] + |
| footer->stripes(static_cast<int>(lastStripe - 1)).numberofrows(); |
| } else { |
| previousRow = 0; |
| } |
| return false; |
| } |
| if (currentRowInStripe == 0) { |
| startNextStripe(); |
| } |
| uint64_t rowsToRead = |
| std::min(static_cast<uint64_t>(data.capacity), |
| rowsInCurrentStripe - currentRowInStripe); |
| data.numElements = rowsToRead; |
| if (enableEncodedBlock) { |
| reader->nextEncoded(data, rowsToRead, nullptr); |
| } |
| else { |
| reader->next(data, rowsToRead, nullptr); |
| } |
| // update row number |
| previousRow = firstRowOfStripe[currentStripe] + currentRowInStripe; |
| currentRowInStripe += rowsToRead; |
| if (currentRowInStripe >= rowsInCurrentStripe) { |
| currentStripe += 1; |
| currentRowInStripe = 0; |
| } |
| return rowsToRead != 0; |
| } |
| |
| std::unique_ptr<ColumnVectorBatch> RowReaderImpl::createRowBatch |
| (uint64_t capacity) const { |
| return getSelectedType().createRowBatch(capacity, *contents->pool, enableEncodedBlock); |
| } |
| |
| void ensureOrcFooter(InputStream* stream, |
| DataBuffer<char> *buffer, |
| uint64_t postscriptLength) { |
| |
| const std::string MAGIC("ORC"); |
| const uint64_t magicLength = MAGIC.length(); |
| const char * const bufferStart = buffer->data(); |
| const uint64_t bufferLength = buffer->size(); |
| |
| if (postscriptLength < magicLength || bufferLength < magicLength) { |
| throw ParseError("Invalid ORC postscript length"); |
| } |
| const char* magicStart = bufferStart + bufferLength - 1 - magicLength; |
| |
| // Look for the magic string at the end of the postscript. |
| if (memcmp(magicStart, MAGIC.c_str(), magicLength) != 0) { |
| // If there is no magic string at the end, check the beginning. |
| // Only files written by Hive 0.11.0 don't have the tail ORC string. |
| std::unique_ptr<char[]> frontBuffer( new char[magicLength] ); |
| stream->read(frontBuffer.get(), magicLength, 0); |
| bool foundMatch = memcmp(frontBuffer.get(), MAGIC.c_str(), magicLength) == 0; |
| |
| if (!foundMatch) { |
| throw ParseError("Not an ORC file"); |
| } |
| } |
| } |
| |
| /** |
| * Read the file's postscript from the given buffer. |
| * @param stream the file stream |
| * @param buffer the buffer with the tail of the file. |
| * @param postscriptSize the length of postscript in bytes |
| */ |
| std::unique_ptr<proto::PostScript> readPostscript(InputStream *stream, |
| DataBuffer<char> *buffer, |
| uint64_t postscriptSize) { |
| char *ptr = buffer->data(); |
| uint64_t readSize = buffer->size(); |
| |
| ensureOrcFooter(stream, buffer, postscriptSize); |
| |
| std::unique_ptr<proto::PostScript> postscript = |
| std::unique_ptr<proto::PostScript>(new proto::PostScript()); |
| if (readSize < 1 + postscriptSize) { |
| std::stringstream msg; |
| msg << "Invalid ORC postscript length: " << postscriptSize << ", file length = " |
| << stream->getLength(); |
| throw ParseError(msg.str()); |
| } |
| if (!postscript->ParseFromArray(ptr + readSize - 1 - postscriptSize, |
| static_cast<int>(postscriptSize))) { |
| throw ParseError("Failed to parse the postscript from " + |
| stream->getName()); |
| } |
| return REDUNDANT_MOVE(postscript); |
| } |
| |
| /** |
| * Check that proto Types are valid. Indices in the type tree should be valid, |
| * so we won't crash when we convert the proto::Types to TypeImpls (ORC-317). |
| * For STRUCT types, fieldName size should match subTypes size (ORC-581). |
| */ |
| void checkProtoTypes(const proto::Footer &footer) { |
| std::stringstream msg; |
| int maxId = footer.types_size(); |
| if (maxId <= 0) { |
| throw ParseError("Footer is corrupt: no types found"); |
| } |
| for (int i = 0; i < maxId; ++i) { |
| const proto::Type& type = footer.types(i); |
| if (type.kind() == proto::Type_Kind_STRUCT |
| && type.subtypes_size() != type.fieldnames_size()) { |
| msg << "Footer is corrupt: STRUCT type " << i << " has " << type.subtypes_size() |
| << " subTypes, but has " << type.fieldnames_size() << " fieldNames"; |
| throw ParseError(msg.str()); |
| } |
| for (int j = 0; j < type.subtypes_size(); ++j) { |
| int subTypeId = static_cast<int>(type.subtypes(j)); |
| if (subTypeId <= i) { |
| msg << "Footer is corrupt: malformed link from type " << i << " to " |
| << subTypeId; |
| throw ParseError(msg.str()); |
| } |
| if (subTypeId >= maxId) { |
| msg << "Footer is corrupt: types(" << subTypeId << ") not exists"; |
| throw ParseError(msg.str()); |
| } |
| if (j > 0 && static_cast<int>(type.subtypes(j - 1)) >= subTypeId) { |
| msg << "Footer is corrupt: subType(" << (j-1) << ") >= subType(" << j |
| << ") in types(" << i << "). (" << type.subtypes(j - 1) << " >= " |
| << subTypeId << ")"; |
| throw ParseError(msg.str()); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Parse the footer from the given buffer. |
| * @param stream the file's stream |
| * @param buffer the buffer to parse the footer from |
| * @param footerOffset the offset within the buffer that contains the footer |
| * @param ps the file's postscript |
| * @param memoryPool the memory pool to use |
| */ |
| std::unique_ptr<proto::Footer> readFooter(InputStream* stream, |
| const DataBuffer<char> *buffer, |
| uint64_t footerOffset, |
| const proto::PostScript& ps, |
| MemoryPool& memoryPool) { |
| const char *footerPtr = buffer->data() + footerOffset; |
| |
| std::unique_ptr<SeekableInputStream> pbStream = |
| createDecompressor(convertCompressionKind(ps), |
| std::unique_ptr<SeekableInputStream> |
| (new SeekableArrayInputStream(footerPtr, |
| ps.footerlength())), |
| getCompressionBlockSize(ps), |
| memoryPool); |
| |
| std::unique_ptr<proto::Footer> footer = |
| std::unique_ptr<proto::Footer>(new proto::Footer()); |
| if (!footer->ParseFromZeroCopyStream(pbStream.get())) { |
| throw ParseError("Failed to parse the footer from " + |
| stream->getName()); |
| } |
| |
| checkProtoTypes(*footer); |
| return REDUNDANT_MOVE(footer); |
| } |
| |
| std::unique_ptr<Reader> createReader(std::unique_ptr<InputStream> stream, |
| const ReaderOptions& options) { |
| std::shared_ptr<FileContents> contents = std::shared_ptr<FileContents>(new FileContents()); |
| contents->pool = options.getMemoryPool(); |
| contents->errorStream = options.getErrorStream(); |
| std::string serializedFooter = options.getSerializedFileTail(); |
| uint64_t fileLength; |
| uint64_t postscriptLength; |
| if (serializedFooter.length() != 0) { |
| // Parse the file tail from the serialized one. |
| proto::FileTail tail; |
| if (!tail.ParseFromString(serializedFooter)) { |
| throw ParseError("Failed to parse the file tail from string"); |
| } |
| contents->postscript.reset(new proto::PostScript(tail.postscript())); |
| contents->footer.reset(new proto::Footer(tail.footer())); |
| fileLength = tail.filelength(); |
| postscriptLength = tail.postscriptlength(); |
| } else { |
| // figure out the size of the file using the option or filesystem |
| fileLength = std::min(options.getTailLocation(), |
| static_cast<uint64_t>(stream->getLength())); |
| |
| //read last bytes into buffer to get PostScript |
| uint64_t readSize = std::min(fileLength, DIRECTORY_SIZE_GUESS); |
| if (readSize < 4) { |
| throw ParseError("File size too small"); |
| } |
| std::unique_ptr<DataBuffer<char>> buffer( new DataBuffer<char>(*contents->pool, readSize) ); |
| stream->read(buffer->data(), readSize, fileLength - readSize); |
| |
| postscriptLength = buffer->data()[readSize - 1] & 0xff; |
| contents->postscript = REDUNDANT_MOVE(readPostscript(stream.get(), |
| buffer.get(), postscriptLength)); |
| uint64_t footerSize = contents->postscript->footerlength(); |
| uint64_t tailSize = 1 + postscriptLength + footerSize; |
| if (tailSize >= fileLength) { |
| std::stringstream msg; |
| msg << "Invalid ORC tailSize=" << tailSize << ", fileLength=" << fileLength; |
| throw ParseError(msg.str()); |
| } |
| uint64_t footerOffset; |
| |
| if (tailSize > readSize) { |
| buffer->resize(footerSize); |
| stream->read(buffer->data(), footerSize, fileLength - tailSize); |
| footerOffset = 0; |
| } else { |
| footerOffset = readSize - tailSize; |
| } |
| |
| contents->footer = REDUNDANT_MOVE(readFooter(stream.get(), buffer.get(), |
| footerOffset, *contents->postscript, *contents->pool)); |
| } |
| contents->stream = std::move(stream); |
| return std::unique_ptr<Reader>(new ReaderImpl(std::move(contents), |
| options, |
| fileLength, |
| postscriptLength)); |
| } |
| |
| std::map<uint32_t, BloomFilterIndex> |
| ReaderImpl::getBloomFilters(uint32_t stripeIndex, |
| const std::set<uint32_t>& included) const { |
| std::map<uint32_t, BloomFilterIndex> ret; |
| |
| // find stripe info |
| if (stripeIndex >= static_cast<uint32_t>(footer->stripes_size())) { |
| throw std::logic_error("Illegal stripe index: " + to_string(static_cast<int64_t>(stripeIndex))); |
| } |
| const proto::StripeInformation currentStripeInfo = |
| footer->stripes(static_cast<int>(stripeIndex)); |
| const proto::StripeFooter currentStripeFooter = |
| getStripeFooter(currentStripeInfo, *contents); |
| |
| // iterate stripe footer to get stream of bloomfilter |
| uint64_t offset = static_cast<uint64_t>(currentStripeInfo.offset()); |
| for (int i = 0; i < currentStripeFooter.streams_size(); i++) { |
| const proto::Stream& stream = currentStripeFooter.streams(i); |
| uint32_t column = static_cast<uint32_t>(stream.column()); |
| uint64_t length = static_cast<uint64_t>(stream.length()); |
| |
| // a bloom filter stream from a selected column is found |
| if (stream.kind() == proto::Stream_Kind_BLOOM_FILTER_UTF8 && |
| (included.empty() || included.find(column) != included.end())) { |
| |
| std::unique_ptr<SeekableInputStream> pbStream = |
| createDecompressor(contents->compression, |
| std::unique_ptr<SeekableInputStream> |
| (new SeekableFileInputStream(contents->stream.get(), |
| offset, |
| length, |
| *contents->pool)), |
| contents->blockSize, |
| *(contents->pool)); |
| |
| proto::BloomFilterIndex pbBFIndex; |
| if (!pbBFIndex.ParseFromZeroCopyStream(pbStream.get())) { |
| throw ParseError("Failed to parse BloomFilterIndex"); |
| } |
| |
| BloomFilterIndex bfIndex; |
| for (int j = 0; j < pbBFIndex.bloomfilter_size(); j++) { |
| std::unique_ptr<BloomFilter> entry = BloomFilterUTF8Utils::deserialize( |
| stream.kind(), |
| currentStripeFooter.columns(static_cast<int>(stream.column())), |
| pbBFIndex.bloomfilter(j)); |
| bfIndex.entries.push_back(std::shared_ptr<BloomFilter>(std::move(entry))); |
| } |
| |
| // add bloom filters to result for one column |
| ret[column] = bfIndex; |
| } |
| |
| offset += length; |
| } |
| |
| return ret; |
| } |
| |
| RowReader::~RowReader() { |
| // PASS |
| } |
| |
| Reader::~Reader() { |
| // PASS |
| } |
| |
| InputStream::~InputStream() { |
| // PASS |
| }; |
| |
| |
| |
| }// namespace |