blob: e4ca4e29980ab075e629537484ad9dc9c9b46c2e [file] [log] [blame]
diff --git a/c++/include/orc/MemoryPool.hh b/c++/include/orc/MemoryPool.hh
index a914e5f26..efe1d4933 100644
--- a/c++/include/orc/MemoryPool.hh
+++ b/c++/include/orc/MemoryPool.hh
@@ -42,13 +42,15 @@ namespace orc {
uint64_t currentSize_;
// maximal capacity (actual allocated memory)
uint64_t currentCapacity_;
+ // flag to indicate whether it needs to manage buffer or not
+ bool ownBuffer_;
// not implemented
DataBuffer(DataBuffer& buffer);
DataBuffer& operator=(DataBuffer& buffer);
public:
- DataBuffer(MemoryPool& pool, uint64_t size = 0);
+ DataBuffer(MemoryPool& pool, uint64_t size = 0, bool ownBuf = true);
DataBuffer(DataBuffer<T>&& buffer) noexcept;
@@ -81,6 +83,10 @@ namespace orc {
void reserve(uint64_t size);
void resize(uint64_t size);
void zeroOut();
+
+ // set external buffer
+ void setData(T* buf, size_t bufSize);
+
};
// Specializations for char
diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh
index b015b6491..585e50ec5 100644
--- a/c++/include/orc/Reader.hh
+++ b/c++/include/orc/Reader.hh
@@ -659,6 +659,9 @@ namespace orc {
virtual void preBuffer(const std::vector<uint32_t>& stripes,
const std::list<uint64_t>& includeTypes) = 0;
+ virtual std::vector<std::pair<uint64_t, uint64_t>> preBufferRange(
+ const std::vector<uint32_t>& stripes, const std::list<uint64_t>& includeTypes) = 0;
+
/**
* Release cached entries whose right boundary is less than or equal to the given boundary.
* @param boundary the boundary value to release cache entries
diff --git a/c++/src/ColumnReader.cc b/c++/src/ColumnReader.cc
index af434c37c..08393259c 100644
--- a/c++/src/ColumnReader.cc
+++ b/c++/src/ColumnReader.cc
@@ -332,7 +332,13 @@ namespace orc {
nanoBuffer[i] *= 10;
}
}
+
+ // ORC-306: compensate -1s for JDK bug in java.sql.Timestamp
int64_t writerTime = secsBuffer[i] + epochOffset_;
+ if (writerTime < 0 && nanoBuffer[i] > 999999) {
+ writerTime -= 1;
+ }
+
if (!sameTimezone_) {
// adjust timestamp value to same wall clock time if writer and reader
// time zones have different rules, which is required for Apache Orc.
@@ -347,9 +353,6 @@ namespace orc {
}
}
secsBuffer[i] = writerTime;
- if (secsBuffer[i] < 0 && nanoBuffer[i] > 999999) {
- secsBuffer[i] -= 1;
- }
}
}
}
diff --git a/c++/src/MemoryPool.cc b/c++/src/MemoryPool.cc
index ed7fee737..a8ee8a67c 100644
--- a/c++/src/MemoryPool.cc
+++ b/c++/src/MemoryPool.cc
@@ -52,8 +52,8 @@ namespace orc {
}
template <class T>
- DataBuffer<T>::DataBuffer(MemoryPool& pool, uint64_t newSize)
- : memoryPool_(pool), buf_(nullptr), currentSize_(0), currentCapacity_(0) {
+ DataBuffer<T>::DataBuffer(MemoryPool& pool, uint64_t newSize, bool ownBuf)
+ : memoryPool_(pool), buf_(nullptr), currentSize_(0), currentCapacity_(0), ownBuffer_(ownBuf) {
reserve(newSize);
currentSize_ = newSize;
}
@@ -63,24 +63,35 @@ namespace orc {
: memoryPool_(buffer.memoryPool_),
buf_(buffer.buf_),
currentSize_(buffer.currentSize_),
- currentCapacity_(buffer.currentCapacity_) {
- buffer.buf_ = nullptr;
- buffer.currentSize_ = 0;
- buffer.currentCapacity_ = 0;
+ currentCapacity_(buffer.currentCapacity_),
+ ownBuffer_(buffer.ownBuffer_) {
+ if (buffer.ownBuffer_) {
+ buffer.buf_ = nullptr;
+ buffer.currentSize_ = 0;
+ buffer.currentCapacity_ = 0;
+ }
}
template <class T>
DataBuffer<T>::~DataBuffer() {
+ if (!ownBuffer_) {
+ return;
+ }
for (uint64_t i = currentSize_; i > 0; --i) {
(buf_ + i - 1)->~T();
}
if (buf_) {
+ static_assert(std::is_trivially_copyable<T>::value,
+ "Only trivially copyable type is supported for DataBuffer Reserve");
memoryPool_.free(reinterpret_cast<char*>(buf_));
}
}
template <class T>
void DataBuffer<T>::resize(uint64_t newSize) {
+ if (!ownBuffer_) {
+ return;
+ }
reserve(newSize);
if (currentSize_ > newSize) {
for (uint64_t i = currentSize_; i > newSize; --i) {
@@ -96,6 +107,9 @@ namespace orc {
template <class T>
void DataBuffer<T>::reserve(uint64_t newCapacity) {
+ if (!ownBuffer_) {
+ return;
+ }
if (newCapacity > currentCapacity_ || !buf_) {
if (buf_) {
T* buf_old = buf_;
@@ -114,6 +128,18 @@ namespace orc {
memset(buf_, 0, sizeof(T) * currentCapacity_);
}
+ template <class T>
+ void DataBuffer<T>::setData(T* buffer, size_t bufSize) {
+ if (ownBuffer_ && buf_) {
+ static_assert(std::is_trivially_copyable<T>::value,
+ "Only trivially copyable type is supported for DataBuffer Reserve");
+ memoryPool_.free(reinterpret_cast<char*>(buf_));
+ }
+ ownBuffer_ = false;
+ buf_ = buffer;
+ currentSize_ = currentCapacity_ = bufSize / sizeof(T);
+ }
+
// Specializations for Int128
template <>
void DataBuffer<Int128>::zeroOut() {
@@ -126,13 +152,16 @@ namespace orc {
template <>
DataBuffer<char>::~DataBuffer() {
- if (buf_) {
+ if (ownBuffer_ && buf_) {
memoryPool_.free(reinterpret_cast<char*>(buf_));
}
}
template <>
void DataBuffer<char>::resize(uint64_t newSize) {
+ if (!ownBuffer_) {
+ return;
+ }
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, newSize - currentSize_);
@@ -144,13 +173,16 @@ namespace orc {
template <>
DataBuffer<char*>::~DataBuffer() {
- if (buf_) {
+ if (ownBuffer_ && buf_) {
memoryPool_.free(reinterpret_cast<char*>(buf_));
}
}
template <>
void DataBuffer<char*>::resize(uint64_t newSize) {
+ if (!ownBuffer_) {
+ return;
+ }
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(char*));
@@ -162,13 +194,16 @@ namespace orc {
template <>
DataBuffer<double>::~DataBuffer() {
- if (buf_) {
+ if (ownBuffer_ && buf_) {
memoryPool_.free(reinterpret_cast<char*>(buf_));
}
}
template <>
void DataBuffer<double>::resize(uint64_t newSize) {
+ if (!ownBuffer_) {
+ return;
+ }
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(double));
@@ -180,13 +215,16 @@ namespace orc {
template <>
DataBuffer<float>::~DataBuffer() {
- if (buf_) {
+ if (ownBuffer_ && buf_) {
memoryPool_.free(reinterpret_cast<char*>(buf_));
}
}
template <>
void DataBuffer<float>::resize(uint64_t newSize) {
+ if (!ownBuffer_) {
+ return;
+ }
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(float));
@@ -198,13 +236,17 @@ namespace orc {
template <>
DataBuffer<int64_t>::~DataBuffer() {
- if (buf_) {
+ if (ownBuffer_ && buf_) {
memoryPool_.free(reinterpret_cast<char*>(buf_));
}
}
template <>
void DataBuffer<int64_t>::resize(uint64_t newSize) {
+ if (!ownBuffer_) {
+ return;
+ }
+
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(int64_t));
@@ -216,13 +258,17 @@ namespace orc {
template <>
DataBuffer<int32_t>::~DataBuffer() {
- if (buf_) {
+ if (ownBuffer_ && buf_) {
memoryPool_.free(reinterpret_cast<char*>(buf_));
}
}
template <>
void DataBuffer<int32_t>::resize(uint64_t newSize) {
+ if (!ownBuffer_) {
+ return;
+ }
+
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(int32_t));
@@ -234,13 +280,17 @@ namespace orc {
template <>
DataBuffer<int16_t>::~DataBuffer() {
- if (buf_) {
+ if (ownBuffer_ && buf_) {
memoryPool_.free(reinterpret_cast<char*>(buf_));
}
}
template <>
void DataBuffer<int16_t>::resize(uint64_t newSize) {
+ if (!ownBuffer_) {
+ return;
+ }
+
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(int16_t));
@@ -252,13 +302,17 @@ namespace orc {
template <>
DataBuffer<int8_t>::~DataBuffer() {
- if (buf_) {
+ if (ownBuffer_ && buf_) {
memoryPool_.free(reinterpret_cast<char*>(buf_));
}
}
template <>
void DataBuffer<int8_t>::resize(uint64_t newSize) {
+ if (!ownBuffer_) {
+ return;
+ }
+
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(int8_t));
@@ -270,13 +324,17 @@ namespace orc {
template <>
DataBuffer<uint64_t>::~DataBuffer() {
- if (buf_) {
+ if (ownBuffer_ && buf_) {
memoryPool_.free(reinterpret_cast<char*>(buf_));
}
}
template <>
void DataBuffer<uint64_t>::resize(uint64_t newSize) {
+ if (!ownBuffer_) {
+ return;
+ }
+
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(uint64_t));
@@ -288,13 +346,17 @@ namespace orc {
template <>
DataBuffer<unsigned char>::~DataBuffer() {
- if (buf_) {
+ if (ownBuffer_ && buf_) {
memoryPool_.free(reinterpret_cast<char*>(buf_));
}
}
template <>
void DataBuffer<unsigned char>::resize(uint64_t newSize) {
+ if (!ownBuffer_) {
+ return;
+ }
+
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, newSize - currentSize_);
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index c93c62f6c..2a821b622 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -1531,8 +1531,8 @@ namespace orc {
}
}
- void ReaderImpl::preBuffer(const std::vector<uint32_t>& stripes,
- const std::list<uint64_t>& includeTypes) {
+ std::vector<std::pair<uint64_t, uint64_t>> ReaderImpl::preBufferRange(
+ const std::vector<uint32_t>& stripes, const std::list<uint64_t>& includeTypes) {
std::vector<uint32_t> newStripes;
for (auto stripe : stripes) {
if (stripe < static_cast<uint32_t>(footer_->stripes_size())) newStripes.push_back(stripe);
@@ -1544,7 +1544,7 @@ namespace orc {
}
if (newStripes.empty() || newIncludeTypes.empty()) {
- return;
+ return {};
}
orc::RowReaderOptions rowReaderOptions;
@@ -1553,7 +1553,7 @@ namespace orc {
std::vector<bool> selectedColumns;
columnSelector.updateSelected(selectedColumns, rowReaderOptions);
- std::vector<ReadRange> ranges;
+ std::vector<std::pair<uint64_t, uint64_t>> ranges;
ranges.reserve(newIncludeTypes.size());
for (auto stripe : newStripes) {
// get stripe information
@@ -1598,17 +1598,23 @@ namespace orc {
offset += stream.length();
}
+ }
+ return ranges;
+ }
- {
- std::lock_guard<std::mutex> lock(contents_->readCacheMutex);
-
- if (!contents_->readCache) {
- contents_->readCache = std::make_shared<ReadRangeCache>(
- getStream(), options_.getCacheOptions(), contents_->pool, contents_->readerMetrics);
- }
- contents_->readCache->cache(std::move(ranges));
- }
+ void ReaderImpl::preBuffer(const std::vector<uint32_t>& stripes,
+ const std::list<uint64_t>& includeTypes) {
+ auto ranges = preBufferRange(stripes, includeTypes);
+ std::vector<ReadRange> read_ranges;
+ for (const auto& range : ranges) {
+ read_ranges.emplace_back(range.first, range.second);
+ }
+ std::lock_guard<std::mutex> lock(contents_->readCacheMutex);
+ if (!contents_->readCache) {
+ contents_->readCache = std::make_shared<ReadRangeCache>(
+ getStream(), options_.getCacheOptions(), contents_->pool, contents_->readerMetrics);
}
+ contents_->readCache->cache(std::move(read_ranges));
}
RowReader::~RowReader() {
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index 39ca73967..13da45a49 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -387,6 +387,9 @@ namespace orc {
std::map<uint32_t, BloomFilterIndex> getBloomFilters(
uint32_t stripeIndex, const std::set<uint32_t>& included) const override;
+ std::vector<std::pair<uint64_t, uint64_t>> preBufferRange(
+ const std::vector<uint32_t>& stripes, const std::list<uint64_t>& includeTypes) override;
+
void preBuffer(const std::vector<uint32_t>& stripes,
const std::list<uint64_t>& includeTypes) override;
void releaseBuffer(uint64_t boundary) override;
diff --git a/cmake_modules/ThirdpartyToolchain.cmake b/cmake_modules/ThirdpartyToolchain.cmake
index 9b2c829c7..434841224 100644
--- a/cmake_modules/ThirdpartyToolchain.cmake
+++ b/cmake_modules/ThirdpartyToolchain.cmake
@@ -19,6 +19,8 @@ set(ORC_VENDOR_DEPENDENCIES)
set(ORC_SYSTEM_DEPENDENCIES)
set(ORC_INSTALL_INTERFACE_TARGETS)
+set(BUILD_POSITION_INDEPENDENT_LIB ON)
+
set(ORC_FORMAT_VERSION "1.0.0")
set(LZ4_VERSION "1.10.0")
set(SNAPPY_VERSION "1.2.1")