[Feature] Add input stream of stripe streams in stripe reader.
diff --git a/c++/include/orc/Common.hh b/c++/include/orc/Common.hh
index beae9dd..9df0b0e 100644
--- a/c++/include/orc/Common.hh
+++ b/c++/include/orc/Common.hh
@@ -156,6 +156,46 @@
std::string columnEncodingKindToString(ColumnEncodingKind kind);
+ class StreamId {
+ public:
+ StreamId(uint64_t columnId, StreamKind streamKind)
+ : _columnId(columnId), _streamKind(streamKind) {}
+
+ size_t hash() const {
+ size_t h1 = std::hash<uint64_t>{}(_columnId);
+ size_t h2 = std::hash<int>{}(static_cast<int>(_streamKind));
+ return h1 ^ (h2 << 1);
+ }
+
+ bool operator==(const StreamId& other) const {
+ return _columnId == other._columnId && _streamKind == other._streamKind;
+ }
+
+ bool operator<(const StreamId& other) const {
+ if (_columnId != other._columnId) {
+ return _columnId < other._columnId;
+ }
+ return static_cast<int>(_streamKind) < static_cast<int>(other._streamKind);
+ }
+
+ std::string toString() const {
+ std::ostringstream oss;
+ oss << "[columnId=" << _columnId << ", streamKind=" << static_cast<int>(_streamKind) << "]";
+ return oss.str();
+ }
+
+ uint64_t columnId() const {
+ return _columnId;
+ }
+ StreamKind streamKind() const {
+ return _streamKind;
+ }
+
+ private:
+ uint64_t _columnId;
+ StreamKind _streamKind;
+ };
+
class StripeInformation {
public:
virtual ~StripeInformation();
@@ -306,4 +346,13 @@
} // namespace orc
+namespace std {
+ template <>
+ struct hash<orc::StreamId> {
+ size_t operator()(const orc::StreamId& id) const {
+ return id.hash();
+ }
+ };
+} // namespace std
+
#endif
diff --git a/c++/include/orc/OrcFile.hh b/c++/include/orc/OrcFile.hh
index c52b66b..c7826e1 100644
--- a/c++/include/orc/OrcFile.hh
+++ b/c++/include/orc/OrcFile.hh
@@ -63,8 +63,10 @@
*/
virtual const std::string& getName() const = 0;
- virtual void beforeReadStripe(std::unique_ptr<StripeInformation> currentStripeInformation,
- std::vector<bool> selectedColumns);
+ virtual void beforeReadStripe(
+ std::unique_ptr<StripeInformation> currentStripeInformation,
+ std::vector<bool> selectedColumns,
+ std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& streams);
};
/**
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index b8e1a91..564c1f2 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -549,11 +549,14 @@
if (selectedColumns[colId] && pbStream.has_kind() &&
(pbStream.kind() == proto::Stream_Kind_ROW_INDEX ||
pbStream.kind() == proto::Stream_Kind_BLOOM_FILTER_UTF8)) {
- std::unique_ptr<SeekableInputStream> inStream = createDecompressor(
- getCompression(),
- std::unique_ptr<SeekableInputStream>(new SeekableFileInputStream(
- contents->stream.get(), offset, pbStream.length(), *contents->pool)),
- getCompressionSize(), *contents->pool, contents->readerMetrics);
+ auto iter = streams.find({colId, static_cast<StreamKind>(pbStream.kind())});
+ InputStream* inputStream =
+ (iter != streams.end()) ? iter->second.get() : contents->stream.get();
+ std::unique_ptr<SeekableInputStream> inStream =
+ createDecompressor(getCompression(),
+ std::unique_ptr<SeekableInputStream>(new SeekableFileInputStream(
+ inputStream, offset, pbStream.length(), *contents->pool)),
+ getCompressionSize(), *contents->pool, contents->readerMetrics);
if (pbStream.kind() == proto::Stream_Kind_ROW_INDEX) {
proto::RowIndex rowIndex;
@@ -951,7 +954,7 @@
readMetadata();
}
- std::vector<int> allStripesNeeded(numberOfStripes,1);
+ std::vector<int> allStripesNeeded(numberOfStripes, 1);
if (opts.getSearchArgument() && footer->rowindexstride() > 0) {
auto sargs = opts.getSearchArgument();
@@ -963,19 +966,20 @@
return allStripesNeeded;
}
- for ( uint64_t currentStripeIndex = 0;currentStripeIndex < numberOfStripes ; currentStripeIndex ++) {
+ for (uint64_t currentStripeIndex = 0; currentStripeIndex < numberOfStripes;
+ currentStripeIndex++) {
const auto& currentStripeStats =
- contents->metadata->stripestats(static_cast<int>(currentStripeIndex));
- //Not need add mMetrics,so use 0.
- allStripesNeeded[currentStripeIndex] = sargsApplier->evaluateStripeStatistics(currentStripeStats, 0);;
+ contents->metadata->stripestats(static_cast<int>(currentStripeIndex));
+ // Not need add mMetrics,so use 0.
+ allStripesNeeded[currentStripeIndex] =
+ sargsApplier->evaluateStripeStatistics(currentStripeStats, 0);
+ ;
}
contents->sargsApplier = std::move(sargsApplier);
}
return allStripesNeeded;
}
-
-
uint64_t maxStreamsForType(const proto::Type& type) {
switch (static_cast<int64_t>(type.kind())) {
case proto::Type_Kind_STRUCT:
@@ -1161,6 +1165,7 @@
reader.reset(); // ColumnReaders use lots of memory; free old memory first
rowIndexes.clear();
bloomFilterIndex.clear();
+ streams.clear();
followRowInStripe = 0;
// evaluate file statistics if it exists
@@ -1189,13 +1194,13 @@
if (sargsApplier) {
bool isStripeNeeded = true;
if (contents->metadata) {
- const auto ¤tStripeStats =
- contents->metadata->stripestats(static_cast<int>(currentStripe));
+ const auto& currentStripeStats =
+ contents->metadata->stripestats(static_cast<int>(currentStripe));
// skip this stripe after stats fail to satisfy sargs
uint64_t stripeRowGroupCount =
- (rowsInCurrentStripe + footer->rowindexstride() - 1) / footer->rowindexstride();
+ (rowsInCurrentStripe + footer->rowindexstride() - 1) / footer->rowindexstride();
isStripeNeeded =
- sargsApplier->evaluateStripeStatistics(currentStripeStats, stripeRowGroupCount);
+ sargsApplier->evaluateStripeStatistics(currentStripeStats, stripeRowGroupCount);
}
if (!isStripeNeeded) {
// advance to next stripe when current stripe has no matching rows
@@ -1209,11 +1214,12 @@
processingStripe = currentStripe;
std::unique_ptr<StripeInformation> currentStripeInformation(new StripeInformationImpl(
- currentStripeInfo.offset(), currentStripeInfo.indexlength(),
- currentStripeInfo.datalength(), currentStripeInfo.footerlength(),
- currentStripeInfo.numberofrows(), contents->stream.get(), *contents->pool,
- contents->compression, contents->blockSize, contents->readerMetrics));
- contents->stream->beforeReadStripe(std::move(currentStripeInformation), selectedColumns);
+ currentStripeInfo.offset(), currentStripeInfo.indexlength(),
+ currentStripeInfo.datalength(), currentStripeInfo.footerlength(),
+ currentStripeInfo.numberofrows(), contents->stream.get(), *contents->pool,
+ contents->compression, contents->blockSize, contents->readerMetrics));
+ contents->stream->beforeReadStripe(std::move(currentStripeInformation), selectedColumns,
+ streams);
if (sargsApplier) {
bool isStripeNeeded = true;
@@ -1237,8 +1243,8 @@
? getTimezoneByName(currentStripeFooter.writertimezone())
: localTimezone;
StripeStreamsImpl stripeStreams(*this, currentStripe, currentStripeInfo, currentStripeFooter,
- currentStripeInfo.offset(), *contents->stream, writerTimezone,
- readerTimezone);
+ currentStripeInfo.offset(), *contents->stream, streams,
+ writerTimezone, readerTimezone);
reader = buildReader(*contents->schema, stripeStreams, useTightNumericVector);
if (stringDictFilter != nullptr) {
@@ -1760,7 +1766,9 @@
// PASS
};
- void InputStream::beforeReadStripe(std::unique_ptr<StripeInformation> currentStripeInformation,
- std::vector<bool> selectedColumns) {}
+ void InputStream::beforeReadStripe(
+ std::unique_ptr<StripeInformation> currentStripeInformation,
+ std::vector<bool> selectedColumns,
+ std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& streams) {}
} // namespace orc
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index 9505022..7ec049a 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -162,6 +162,7 @@
// contents
std::shared_ptr<FileContents> contents;
+ std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>> streams;
const bool throwOnHive11DecimalOverflow;
const int32_t forcedScaleOnHive11Decimal;
@@ -322,10 +323,9 @@
// internal methods
void readMetadata() const;
void checkOrcVersion();
- void getRowIndexStatistics(
- const proto::StripeInformation& stripeInfo, uint64_t stripeIndex,
- const proto::StripeFooter& currentStripeFooter,
- std::vector<std::vector<proto::ColumnStatistics> >* indexStats) const;
+ void getRowIndexStatistics(const proto::StripeInformation& stripeInfo, uint64_t stripeIndex,
+ const proto::StripeFooter& currentStripeFooter,
+ std::vector<std::vector<proto::ColumnStatistics>>* indexStats) const;
// metadata
mutable bool isMetadataLoaded;
@@ -425,8 +425,8 @@
return contents->stream.get();
}
- void setStream(std::unique_ptr<InputStream> inputStreamUPtr) override{
- contents->stream = std::move(inputStreamUPtr);
+ void setStream(std::unique_ptr<InputStream> inputStreamUPtr) override {
+ contents->stream = std::move(inputStreamUPtr);
}
uint64_t getMemoryUse(int stripeIx = -1) override;
diff --git a/c++/src/StripeStream.cc b/c++/src/StripeStream.cc
index 1f43da4..8efa23e 100644
--- a/c++/src/StripeStream.cc
+++ b/c++/src/StripeStream.cc
@@ -25,17 +25,18 @@
namespace orc {
- StripeStreamsImpl::StripeStreamsImpl(const RowReaderImpl& _reader, uint64_t _index,
- const proto::StripeInformation& _stripeInfo,
- const proto::StripeFooter& _footer, uint64_t _stripeStart,
- InputStream& _input, const Timezone& _writerTimezone,
- const Timezone& _readerTimezone)
+ StripeStreamsImpl::StripeStreamsImpl(
+ const RowReaderImpl& _reader, uint64_t _index, const proto::StripeInformation& _stripeInfo,
+ const proto::StripeFooter& _footer, uint64_t _stripeStart, InputStream& _input,
+ const std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& _streams,
+ const Timezone& _writerTimezone, const Timezone& _readerTimezone)
: reader(_reader),
stripeInfo(_stripeInfo),
footer(_footer),
stripeIndex(_index),
stripeStart(_stripeStart),
input(_input),
+ streams(_streams),
writerTimezone(_writerTimezone),
readerTimezone(_readerTimezone) {
// PASS
@@ -87,8 +88,10 @@
const proto::Stream& stream = footer.streams(i);
if (stream.has_kind() && stream.kind() == kind &&
stream.column() == static_cast<uint64_t>(columnId)) {
+ auto iter = streams.find({columnId, static_cast<StreamKind>(kind)});
+ InputStream* inputStream = (iter != streams.end()) ? iter->second.get() : &input;
uint64_t streamLength = stream.length();
- uint64_t myBlock = shouldStream ? input.getNaturalReadSize() : streamLength;
+ uint64_t myBlock = shouldStream ? inputStream->getNaturalReadSize() : streamLength;
if (offset + streamLength > dataEnd) {
std::stringstream msg;
msg << "Malformed stream meta at stream index " << i << " in stripe " << stripeIndex
@@ -100,7 +103,7 @@
}
return createDecompressor(reader.getCompression(),
std::make_unique<SeekableFileInputStream>(
- &input, offset, stream.length(), *pool, myBlock),
+ inputStream, offset, stream.length(), *pool, myBlock),
reader.getCompressionSize(), *pool,
reader.getFileContents().readerMetrics);
}
diff --git a/c++/src/StripeStream.hh b/c++/src/StripeStream.hh
index 74bebda..57e51ef 100644
--- a/c++/src/StripeStream.hh
+++ b/c++/src/StripeStream.hh
@@ -43,14 +43,16 @@
const uint64_t stripeIndex;
const uint64_t stripeStart;
InputStream& input;
+ const std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& streams;
const Timezone& writerTimezone;
const Timezone& readerTimezone;
public:
- StripeStreamsImpl(const RowReaderImpl& reader, uint64_t index,
- const proto::StripeInformation& stripeInfo, const proto::StripeFooter& footer,
- uint64_t stripeStart, InputStream& input, const Timezone& writerTimezone,
- const Timezone& readerTimezone);
+ StripeStreamsImpl(
+ const RowReaderImpl& reader, uint64_t index, const proto::StripeInformation& stripeInfo,
+ const proto::StripeFooter& footer, uint64_t stripeStart, InputStream& input,
+ const std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& streams,
+ const Timezone& writerTimezone, const Timezone& readerTimezone);
virtual ~StripeStreamsImpl() override;