| diff --git a/c++/include/orc/MemoryPool.hh b/c++/include/orc/MemoryPool.hh |
| index a914e5f26..efe1d4933 100644 |
| --- a/c++/include/orc/MemoryPool.hh |
| +++ b/c++/include/orc/MemoryPool.hh |
| @@ -42,13 +42,15 @@ namespace orc { |
| uint64_t currentSize_; |
| // maximal capacity (actual allocated memory) |
| uint64_t currentCapacity_; |
| + // flag to indicate whether it needs to manage buffer or not |
| + bool ownBuffer_; |
| |
| // not implemented |
| DataBuffer(DataBuffer& buffer); |
| DataBuffer& operator=(DataBuffer& buffer); |
| |
| public: |
| - DataBuffer(MemoryPool& pool, uint64_t size = 0); |
| + DataBuffer(MemoryPool& pool, uint64_t size = 0, bool ownBuf = true); |
| |
| DataBuffer(DataBuffer<T>&& buffer) noexcept; |
| |
| @@ -81,6 +83,10 @@ namespace orc { |
| void reserve(uint64_t size); |
| void resize(uint64_t size); |
| void zeroOut(); |
| + |
| + // set external buffer |
| + void setData(T* buf, size_t bufSize); |
| + |
| }; |
| |
| // Specializations for char |
| diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh |
| index b015b6491..585e50ec5 100644 |
| --- a/c++/include/orc/Reader.hh |
| +++ b/c++/include/orc/Reader.hh |
| @@ -659,6 +659,9 @@ namespace orc { |
| virtual void preBuffer(const std::vector<uint32_t>& stripes, |
| const std::list<uint64_t>& includeTypes) = 0; |
| |
| + virtual std::vector<std::pair<uint64_t, uint64_t>> preBufferRange( |
| + const std::vector<uint32_t>& stripes, const std::list<uint64_t>& includeTypes) = 0; |
| + |
| /** |
| * Release cached entries whose right boundary is less than or equal to the given boundary. |
| * @param boundary the boundary value to release cache entries |
| diff --git a/c++/src/ColumnReader.cc b/c++/src/ColumnReader.cc |
| index af434c37c..08393259c 100644 |
| --- a/c++/src/ColumnReader.cc |
| +++ b/c++/src/ColumnReader.cc |
| @@ -332,7 +332,13 @@ namespace orc { |
| nanoBuffer[i] *= 10; |
| } |
| } |
| + |
| + // ORC-306: compensate -1s for JDK bug in java.sql.Timestamp |
| int64_t writerTime = secsBuffer[i] + epochOffset_; |
| + if (writerTime < 0 && nanoBuffer[i] > 999999) { |
| + writerTime -= 1; |
| + } |
| + |
| if (!sameTimezone_) { |
| // adjust timestamp value to same wall clock time if writer and reader |
| // time zones have different rules, which is required for Apache Orc. |
| @@ -347,9 +353,6 @@ namespace orc { |
| } |
| } |
| secsBuffer[i] = writerTime; |
| - if (secsBuffer[i] < 0 && nanoBuffer[i] > 999999) { |
| - secsBuffer[i] -= 1; |
| - } |
| } |
| } |
| } |
| diff --git a/c++/src/MemoryPool.cc b/c++/src/MemoryPool.cc |
| index ed7fee737..a8ee8a67c 100644 |
| --- a/c++/src/MemoryPool.cc |
| +++ b/c++/src/MemoryPool.cc |
| @@ -52,8 +52,8 @@ namespace orc { |
| } |
| |
| template <class T> |
| - DataBuffer<T>::DataBuffer(MemoryPool& pool, uint64_t newSize) |
| - : memoryPool_(pool), buf_(nullptr), currentSize_(0), currentCapacity_(0) { |
| + DataBuffer<T>::DataBuffer(MemoryPool& pool, uint64_t newSize, bool ownBuf) |
| + : memoryPool_(pool), buf_(nullptr), currentSize_(0), currentCapacity_(0), ownBuffer_(ownBuf) { |
| reserve(newSize); |
| currentSize_ = newSize; |
| } |
| @@ -63,24 +63,35 @@ namespace orc { |
| : memoryPool_(buffer.memoryPool_), |
| buf_(buffer.buf_), |
| currentSize_(buffer.currentSize_), |
| - currentCapacity_(buffer.currentCapacity_) { |
| - buffer.buf_ = nullptr; |
| - buffer.currentSize_ = 0; |
| - buffer.currentCapacity_ = 0; |
| + currentCapacity_(buffer.currentCapacity_), |
| + ownBuffer_(buffer.ownBuffer_) { |
| + if (buffer.ownBuffer_) { |
| + buffer.buf_ = nullptr; |
| + buffer.currentSize_ = 0; |
| + buffer.currentCapacity_ = 0; |
| + } |
| } |
| |
| template <class T> |
| DataBuffer<T>::~DataBuffer() { |
| + if (!ownBuffer_) { |
| + return; |
| + } |
| for (uint64_t i = currentSize_; i > 0; --i) { |
| (buf_ + i - 1)->~T(); |
| } |
| if (buf_) { |
| + static_assert(std::is_trivially_copyable<T>::value, |
| + "Only trivially copyable type is supported for DataBuffer Reserve"); |
| memoryPool_.free(reinterpret_cast<char*>(buf_)); |
| } |
| } |
| |
| template <class T> |
| void DataBuffer<T>::resize(uint64_t newSize) { |
| + if (!ownBuffer_) { |
| + return; |
| + } |
| reserve(newSize); |
| if (currentSize_ > newSize) { |
| for (uint64_t i = currentSize_; i > newSize; --i) { |
| @@ -96,6 +107,9 @@ namespace orc { |
| |
| template <class T> |
| void DataBuffer<T>::reserve(uint64_t newCapacity) { |
| + if (!ownBuffer_) { |
| + return; |
| + } |
| if (newCapacity > currentCapacity_ || !buf_) { |
| if (buf_) { |
| T* buf_old = buf_; |
| @@ -114,6 +128,18 @@ namespace orc { |
| memset(buf_, 0, sizeof(T) * currentCapacity_); |
| } |
| |
| + template <class T> |
| + void DataBuffer<T>::setData(T* buffer, size_t bufSize) { |
| + if (ownBuffer_ && buf_) { |
| + static_assert(std::is_trivially_copyable<T>::value, |
| + "Only trivially copyable type is supported for DataBuffer Reserve"); |
| + memoryPool_.free(reinterpret_cast<char*>(buf_)); |
| + } |
| + ownBuffer_ = false; |
| + buf_ = buffer; |
| + currentSize_ = currentCapacity_ = bufSize / sizeof(T); |
| + } |
| + |
| // Specializations for Int128 |
| template <> |
| void DataBuffer<Int128>::zeroOut() { |
| @@ -126,13 +152,16 @@ namespace orc { |
| |
| template <> |
| DataBuffer<char>::~DataBuffer() { |
| - if (buf_) { |
| + if (ownBuffer_ && buf_) { |
| memoryPool_.free(reinterpret_cast<char*>(buf_)); |
| } |
| } |
| |
| template <> |
| void DataBuffer<char>::resize(uint64_t newSize) { |
| + if (!ownBuffer_) { |
| + return; |
| + } |
| reserve(newSize); |
| if (newSize > currentSize_) { |
| memset(buf_ + currentSize_, 0, newSize - currentSize_); |
| @@ -144,13 +173,16 @@ namespace orc { |
| |
| template <> |
| DataBuffer<char*>::~DataBuffer() { |
| - if (buf_) { |
| + if (ownBuffer_ && buf_) { |
| memoryPool_.free(reinterpret_cast<char*>(buf_)); |
| } |
| } |
| |
| template <> |
| void DataBuffer<char*>::resize(uint64_t newSize) { |
| + if (!ownBuffer_) { |
| + return; |
| + } |
| reserve(newSize); |
| if (newSize > currentSize_) { |
| memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(char*)); |
| @@ -162,13 +194,16 @@ namespace orc { |
| |
| template <> |
| DataBuffer<double>::~DataBuffer() { |
| - if (buf_) { |
| + if (ownBuffer_ && buf_) { |
| memoryPool_.free(reinterpret_cast<char*>(buf_)); |
| } |
| } |
| |
| template <> |
| void DataBuffer<double>::resize(uint64_t newSize) { |
| + if (!ownBuffer_) { |
| + return; |
| + } |
| reserve(newSize); |
| if (newSize > currentSize_) { |
| memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(double)); |
| @@ -180,13 +215,16 @@ namespace orc { |
| |
| template <> |
| DataBuffer<float>::~DataBuffer() { |
| - if (buf_) { |
| + if (ownBuffer_ && buf_) { |
| memoryPool_.free(reinterpret_cast<char*>(buf_)); |
| } |
| } |
| |
| template <> |
| void DataBuffer<float>::resize(uint64_t newSize) { |
| + if (!ownBuffer_) { |
| + return; |
| + } |
| reserve(newSize); |
| if (newSize > currentSize_) { |
| memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(float)); |
| @@ -198,13 +236,17 @@ namespace orc { |
| |
| template <> |
| DataBuffer<int64_t>::~DataBuffer() { |
| - if (buf_) { |
| + if (ownBuffer_ && buf_) { |
| memoryPool_.free(reinterpret_cast<char*>(buf_)); |
| } |
| } |
| |
| template <> |
| void DataBuffer<int64_t>::resize(uint64_t newSize) { |
| + if (!ownBuffer_) { |
| + return; |
| + } |
| + |
| reserve(newSize); |
| if (newSize > currentSize_) { |
| memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(int64_t)); |
| @@ -216,13 +258,17 @@ namespace orc { |
| |
| template <> |
| DataBuffer<int32_t>::~DataBuffer() { |
| - if (buf_) { |
| + if (ownBuffer_ && buf_) { |
| memoryPool_.free(reinterpret_cast<char*>(buf_)); |
| } |
| } |
| |
| template <> |
| void DataBuffer<int32_t>::resize(uint64_t newSize) { |
| + if (!ownBuffer_) { |
| + return; |
| + } |
| + |
| reserve(newSize); |
| if (newSize > currentSize_) { |
| memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(int32_t)); |
| @@ -234,13 +280,17 @@ namespace orc { |
| |
| template <> |
| DataBuffer<int16_t>::~DataBuffer() { |
| - if (buf_) { |
| + if (ownBuffer_ && buf_) { |
| memoryPool_.free(reinterpret_cast<char*>(buf_)); |
| } |
| } |
| |
| template <> |
| void DataBuffer<int16_t>::resize(uint64_t newSize) { |
| + if (!ownBuffer_) { |
| + return; |
| + } |
| + |
| reserve(newSize); |
| if (newSize > currentSize_) { |
| memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(int16_t)); |
| @@ -252,13 +302,17 @@ namespace orc { |
| |
| template <> |
| DataBuffer<int8_t>::~DataBuffer() { |
| - if (buf_) { |
| + if (ownBuffer_ && buf_) { |
| memoryPool_.free(reinterpret_cast<char*>(buf_)); |
| } |
| } |
| |
| template <> |
| void DataBuffer<int8_t>::resize(uint64_t newSize) { |
| + if (!ownBuffer_) { |
| + return; |
| + } |
| + |
| reserve(newSize); |
| if (newSize > currentSize_) { |
| memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(int8_t)); |
| @@ -270,13 +324,17 @@ namespace orc { |
| |
| template <> |
| DataBuffer<uint64_t>::~DataBuffer() { |
| - if (buf_) { |
| + if (ownBuffer_ && buf_) { |
| memoryPool_.free(reinterpret_cast<char*>(buf_)); |
| } |
| } |
| |
| template <> |
| void DataBuffer<uint64_t>::resize(uint64_t newSize) { |
| + if (!ownBuffer_) { |
| + return; |
| + } |
| + |
| reserve(newSize); |
| if (newSize > currentSize_) { |
| memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(uint64_t)); |
| @@ -288,13 +346,17 @@ namespace orc { |
| |
| template <> |
| DataBuffer<unsigned char>::~DataBuffer() { |
| - if (buf_) { |
| + if (ownBuffer_ && buf_) { |
| memoryPool_.free(reinterpret_cast<char*>(buf_)); |
| } |
| } |
| |
| template <> |
| void DataBuffer<unsigned char>::resize(uint64_t newSize) { |
| + if (!ownBuffer_) { |
| + return; |
| + } |
| + |
| reserve(newSize); |
| if (newSize > currentSize_) { |
| memset(buf_ + currentSize_, 0, newSize - currentSize_); |
| diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc |
| index c93c62f6c..2a821b622 100644 |
| --- a/c++/src/Reader.cc |
| +++ b/c++/src/Reader.cc |
| @@ -1531,8 +1531,8 @@ namespace orc { |
| } |
| } |
| |
| - void ReaderImpl::preBuffer(const std::vector<uint32_t>& stripes, |
| - const std::list<uint64_t>& includeTypes) { |
| + std::vector<std::pair<uint64_t, uint64_t>> ReaderImpl::preBufferRange( |
| + const std::vector<uint32_t>& stripes, const std::list<uint64_t>& includeTypes) { |
| std::vector<uint32_t> newStripes; |
| for (auto stripe : stripes) { |
| if (stripe < static_cast<uint32_t>(footer_->stripes_size())) newStripes.push_back(stripe); |
| @@ -1544,7 +1544,7 @@ namespace orc { |
| } |
| |
| if (newStripes.empty() || newIncludeTypes.empty()) { |
| - return; |
| + return {}; |
| } |
| |
| orc::RowReaderOptions rowReaderOptions; |
| @@ -1553,7 +1553,7 @@ namespace orc { |
| std::vector<bool> selectedColumns; |
| columnSelector.updateSelected(selectedColumns, rowReaderOptions); |
| |
| - std::vector<ReadRange> ranges; |
| + std::vector<std::pair<uint64_t, uint64_t>> ranges; |
| ranges.reserve(newIncludeTypes.size()); |
| for (auto stripe : newStripes) { |
| // get stripe information |
| @@ -1598,17 +1598,23 @@ namespace orc { |
| |
| offset += stream.length(); |
| } |
| + } |
| + return ranges; |
| + } |
| |
| - { |
| - std::lock_guard<std::mutex> lock(contents_->readCacheMutex); |
| - |
| - if (!contents_->readCache) { |
| - contents_->readCache = std::make_shared<ReadRangeCache>( |
| - getStream(), options_.getCacheOptions(), contents_->pool, contents_->readerMetrics); |
| - } |
| - contents_->readCache->cache(std::move(ranges)); |
| - } |
| + void ReaderImpl::preBuffer(const std::vector<uint32_t>& stripes, |
| + const std::list<uint64_t>& includeTypes) { |
| + auto ranges = preBufferRange(stripes, includeTypes); |
| + std::vector<ReadRange> read_ranges; |
| + for (const auto& range : ranges) { |
| + read_ranges.emplace_back(range.first, range.second); |
| + } |
| + std::lock_guard<std::mutex> lock(contents_->readCacheMutex); |
| + if (!contents_->readCache) { |
| + contents_->readCache = std::make_shared<ReadRangeCache>( |
| + getStream(), options_.getCacheOptions(), contents_->pool, contents_->readerMetrics); |
| } |
| + contents_->readCache->cache(std::move(read_ranges)); |
| } |
| |
| RowReader::~RowReader() { |
| diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh |
| index 39ca73967..13da45a49 100644 |
| --- a/c++/src/Reader.hh |
| +++ b/c++/src/Reader.hh |
| @@ -387,6 +387,9 @@ namespace orc { |
| std::map<uint32_t, BloomFilterIndex> getBloomFilters( |
| uint32_t stripeIndex, const std::set<uint32_t>& included) const override; |
| |
| + std::vector<std::pair<uint64_t, uint64_t>> preBufferRange( |
| + const std::vector<uint32_t>& stripes, const std::list<uint64_t>& includeTypes) override; |
| + |
| void preBuffer(const std::vector<uint32_t>& stripes, |
| const std::list<uint64_t>& includeTypes) override; |
| void releaseBuffer(uint64_t boundary) override; |
| diff --git a/cmake_modules/ThirdpartyToolchain.cmake b/cmake_modules/ThirdpartyToolchain.cmake |
| index 9b2c829c7..434841224 100644 |
| --- a/cmake_modules/ThirdpartyToolchain.cmake |
| +++ b/cmake_modules/ThirdpartyToolchain.cmake |
| @@ -19,6 +19,8 @@ set(ORC_VENDOR_DEPENDENCIES) |
| set(ORC_SYSTEM_DEPENDENCIES) |
| set(ORC_INSTALL_INTERFACE_TARGETS) |
| |
| +set(BUILD_POSITION_INDEPENDENT_LIB ON) |
| + |
| set(ORC_FORMAT_VERSION "1.0.0") |
| set(LZ4_VERSION "1.10.0") |
| set(SNAPPY_VERSION "1.2.1") |