| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "orc/Int128.hh" |
| #include "orc/Writer.hh" |
| |
| #include "ByteRLE.hh" |
| #include "ColumnWriter.hh" |
| #include "RLE.hh" |
| #include "Statistics.hh" |
| #include "Timezone.hh" |
| |
| namespace orc { |
| StreamsFactory::~StreamsFactory() { |
| //PASS |
| } |
| |
| class StreamsFactoryImpl : public StreamsFactory { |
| public: |
| StreamsFactoryImpl( |
| const WriterOptions& writerOptions, |
| OutputStream* outputStream) : |
| options(writerOptions), |
| outStream(outputStream) { |
| } |
| |
| virtual std::unique_ptr<BufferedOutputStream> |
| createStream(proto::Stream_Kind kind) const override; |
| private: |
| const WriterOptions& options; |
| OutputStream* outStream; |
| }; |
| |
| std::unique_ptr<BufferedOutputStream> StreamsFactoryImpl::createStream( |
| proto::Stream_Kind) const { |
| // In the future, we can decide compression strategy and modifier |
| // based on stream kind. But for now we just use the setting from |
| // WriterOption |
| return createCompressor( |
| options.getCompression(), |
| outStream, |
| options.getCompressionStrategy(), |
| // BufferedOutputStream initial capacity |
| 1 * 1024 * 1024, |
| options.getCompressionBlockSize(), |
| *options.getMemoryPool()); |
| } |
| |
| std::unique_ptr<StreamsFactory> createStreamsFactory( |
| const WriterOptions& options, |
| OutputStream* outStream) { |
| return std::unique_ptr<StreamsFactory>( |
| new StreamsFactoryImpl(options, outStream)); |
| } |
| |
| RowIndexPositionRecorder::~RowIndexPositionRecorder() { |
| // PASS |
| } |
| |
| proto::ColumnEncoding_Kind RleVersionMapper(RleVersion rleVersion) |
| { |
| switch (rleVersion) |
| { |
| case RleVersion_1: |
| return proto::ColumnEncoding_Kind_DIRECT; |
| case RleVersion_2: |
| return proto::ColumnEncoding_Kind_DIRECT_V2; |
| default: |
| throw InvalidArgument("Invalid param"); |
| } |
| } |
| |
| ColumnWriter::ColumnWriter( |
| const Type& type, |
| const StreamsFactory& factory, |
| const WriterOptions& options) : |
| columnId(type.getColumnId()), |
| colIndexStatistics(), |
| colStripeStatistics(), |
| colFileStatistics(), |
| enableIndex(options.getEnableIndex()), |
| rowIndex(), |
| rowIndexEntry(), |
| rowIndexPosition(), |
| enableBloomFilter(false), |
| memPool(*options.getMemoryPool()), |
| indexStream(), |
| bloomFilterStream() { |
| |
| std::unique_ptr<BufferedOutputStream> presentStream = |
| factory.createStream(proto::Stream_Kind_PRESENT); |
| notNullEncoder = createBooleanRleEncoder(std::move(presentStream)); |
| |
| colIndexStatistics = createColumnStatistics(type); |
| colStripeStatistics = createColumnStatistics(type); |
| colFileStatistics = createColumnStatistics(type); |
| |
| if (enableIndex) { |
| rowIndex = std::unique_ptr<proto::RowIndex>(new proto::RowIndex()); |
| rowIndexEntry = |
| std::unique_ptr<proto::RowIndexEntry>(new proto::RowIndexEntry()); |
| rowIndexPosition = std::unique_ptr<RowIndexPositionRecorder>( |
| new RowIndexPositionRecorder(*rowIndexEntry)); |
| indexStream = |
| factory.createStream(proto::Stream_Kind_ROW_INDEX); |
| |
| // BloomFilters for non-UTF8 strings and non-UTC timestamps are not supported |
| if (options.isColumnUseBloomFilter(columnId) |
| && options.getBloomFilterVersion() == BloomFilterVersion::UTF8) { |
| enableBloomFilter = true; |
| bloomFilter.reset(new BloomFilterImpl( |
| options.getRowIndexStride(), options.getBloomFilterFPP())); |
| bloomFilterIndex.reset(new proto::BloomFilterIndex()); |
| bloomFilterStream = factory.createStream(proto::Stream_Kind_BLOOM_FILTER_UTF8); |
| } |
| } |
| } |
| |
| ColumnWriter::~ColumnWriter() { |
| // PASS |
| } |
| |
| void ColumnWriter::add(ColumnVectorBatch& batch, |
| uint64_t offset, |
| uint64_t numValues, |
| const char* incomingMask) { |
| notNullEncoder->add(batch.notNull.data() + offset, numValues, incomingMask); |
| } |
| |
| void ColumnWriter::flush(std::vector<proto::Stream>& streams) { |
| proto::Stream stream; |
| stream.set_kind(proto::Stream_Kind_PRESENT); |
| stream.set_column(static_cast<uint32_t>(columnId)); |
| stream.set_length(notNullEncoder->flush()); |
| streams.push_back(stream); |
| } |
| |
| uint64_t ColumnWriter::getEstimatedSize() const { |
| return notNullEncoder->getBufferSize(); |
| } |
| |
| void ColumnWriter::getStripeStatistics( |
| std::vector<proto::ColumnStatistics>& stats) const { |
| getProtoBufStatistics(stats, colStripeStatistics.get()); |
| } |
| |
| void ColumnWriter::mergeStripeStatsIntoFileStats() { |
| colFileStatistics->merge(*colStripeStatistics); |
| colStripeStatistics->reset(); |
| } |
| |
| void ColumnWriter::mergeRowGroupStatsIntoStripeStats() { |
| colStripeStatistics->merge(*colIndexStatistics); |
| colIndexStatistics->reset(); |
| } |
| |
| void ColumnWriter::getFileStatistics( |
| std::vector<proto::ColumnStatistics>& stats) const { |
| getProtoBufStatistics(stats, colFileStatistics.get()); |
| } |
| |
| void ColumnWriter::createRowIndexEntry() { |
| proto::ColumnStatistics *indexStats = rowIndexEntry->mutable_statistics(); |
| colIndexStatistics->toProtoBuf(*indexStats); |
| |
| *rowIndex->add_entry() = *rowIndexEntry; |
| |
| rowIndexEntry->clear_positions(); |
| rowIndexEntry->clear_statistics(); |
| |
| colStripeStatistics->merge(*colIndexStatistics); |
| colIndexStatistics->reset(); |
| |
| addBloomFilterEntry(); |
| |
| recordPosition(); |
| } |
| |
| void ColumnWriter::addBloomFilterEntry() { |
| if (enableBloomFilter) { |
| BloomFilterUTF8Utils::serialize(*bloomFilter, *bloomFilterIndex->add_bloomfilter()); |
| bloomFilter->reset(); |
| } |
| } |
| |
| void ColumnWriter::writeIndex(std::vector<proto::Stream> &streams) const { |
| // write row index to output stream |
| rowIndex->SerializeToZeroCopyStream(indexStream.get()); |
| |
| // construct row index stream |
| proto::Stream stream; |
| stream.set_kind(proto::Stream_Kind_ROW_INDEX); |
| stream.set_column(static_cast<uint32_t>(columnId)); |
| stream.set_length(indexStream->flush()); |
| streams.push_back(stream); |
| |
| // write BLOOM_FILTER_UTF8 stream |
| if (enableBloomFilter) { |
| if (!bloomFilterIndex->SerializeToZeroCopyStream(bloomFilterStream.get())) { |
| throw std::logic_error("Failed to write bloom filter stream."); |
| } |
| stream.set_kind(proto::Stream_Kind_BLOOM_FILTER_UTF8); |
| stream.set_column(static_cast<uint32_t>(columnId)); |
| stream.set_length(bloomFilterStream->flush()); |
| streams.push_back(stream); |
| } |
| } |
| |
| void ColumnWriter::recordPosition() const { |
| notNullEncoder->recordPosition(rowIndexPosition.get()); |
| } |
| |
| void ColumnWriter::reset() { |
| if (enableIndex) { |
| // clear row index |
| rowIndex->clear_entry(); |
| rowIndexEntry->clear_positions(); |
| rowIndexEntry->clear_statistics(); |
| |
| // write current positions |
| recordPosition(); |
| } |
| |
| if (enableBloomFilter) { |
| bloomFilter->reset(); |
| bloomFilterIndex->clear_bloomfilter(); |
| } |
| } |
| |
| void ColumnWriter::writeDictionary() { |
| // PASS |
| } |
| |
| class StructColumnWriter : public ColumnWriter { |
| public: |
| StructColumnWriter( |
| const Type& type, |
| const StreamsFactory& factory, |
| const WriterOptions& options); |
| |
| virtual void add(ColumnVectorBatch& rowBatch, |
| uint64_t offset, |
| uint64_t numValues, |
| const char* incomingMask) override; |
| |
| virtual void flush(std::vector<proto::Stream>& streams) override; |
| |
| virtual uint64_t getEstimatedSize() const override; |
| virtual void getColumnEncoding( |
| std::vector<proto::ColumnEncoding>& encodings) const override; |
| |
| virtual void getStripeStatistics( |
| std::vector<proto::ColumnStatistics>& stats) const override; |
| |
| virtual void getFileStatistics( |
| std::vector<proto::ColumnStatistics>& stats) const override; |
| |
| virtual void mergeStripeStatsIntoFileStats() override; |
| |
| virtual void mergeRowGroupStatsIntoStripeStats() override; |
| |
| virtual void createRowIndexEntry() override; |
| |
| virtual void writeIndex( |
| std::vector<proto::Stream> &streams) const override; |
| |
| virtual void writeDictionary() override; |
| |
| virtual void reset() override; |
| |
| private: |
| std::vector<std::unique_ptr<ColumnWriter>> children; |
| }; |
| |
| StructColumnWriter::StructColumnWriter( |
| const Type& type, |
| const StreamsFactory& factory, |
| const WriterOptions& options) : |
| ColumnWriter(type, factory, options) { |
| for(unsigned int i = 0; i < type.getSubtypeCount(); ++i) { |
| const Type& child = *type.getSubtype(i); |
| children.push_back(buildWriter(child, factory, options)); |
| } |
| |
| if (enableIndex) { |
| recordPosition(); |
| } |
| } |
| |
| void StructColumnWriter::add( |
| ColumnVectorBatch& rowBatch, |
| uint64_t offset, |
| uint64_t numValues, |
| const char* incomingMask) { |
| const StructVectorBatch* structBatch = |
| dynamic_cast<const StructVectorBatch *>(&rowBatch); |
| if (structBatch == nullptr) { |
| throw InvalidArgument("Failed to cast to StructVectorBatch"); |
| } |
| |
| ColumnWriter::add(rowBatch, offset, numValues, incomingMask); |
| const char* notNull = structBatch->hasNulls ? |
| structBatch->notNull.data() + offset : nullptr; |
| for (uint32_t i = 0; i < children.size(); ++i) { |
| children[i]->add(*structBatch->fields[i], offset, numValues, notNull); |
| } |
| |
| // update stats |
| if (!notNull) { |
| colIndexStatistics->increase(numValues); |
| } else { |
| uint64_t count = 0; |
| for (uint64_t i = 0; i < numValues; ++i) { |
| if (notNull[i]) { |
| ++count; |
| } |
| } |
| colIndexStatistics->increase(count); |
| if (count < numValues) { |
| colIndexStatistics->setHasNull(true); |
| } |
| } |
| } |
| |
| void StructColumnWriter::flush(std::vector<proto::Stream>& streams) { |
| ColumnWriter::flush(streams); |
| for (uint32_t i = 0; i < children.size(); ++i) { |
| children[i]->flush(streams); |
| } |
| } |
| |
| void StructColumnWriter::writeIndex( |
| std::vector<proto::Stream> &streams) const { |
| ColumnWriter::writeIndex(streams); |
| for (uint32_t i = 0; i < children.size(); ++i) { |
| children[i]->writeIndex(streams); |
| } |
| } |
| |
| uint64_t StructColumnWriter::getEstimatedSize() const { |
| uint64_t size = ColumnWriter::getEstimatedSize(); |
| for (uint32_t i = 0; i < children.size(); ++i) { |
| size += children[i]->getEstimatedSize(); |
| } |
| return size; |
| } |
| |
| void StructColumnWriter::getColumnEncoding( |
| std::vector<proto::ColumnEncoding>& encodings) const { |
| proto::ColumnEncoding encoding; |
| encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); |
| encoding.set_dictionarysize(0); |
| encodings.push_back(encoding); |
| for (uint32_t i = 0; i < children.size(); ++i) { |
| children[i]->getColumnEncoding(encodings); |
| } |
| } |
| |
| void StructColumnWriter::getStripeStatistics( |
| std::vector<proto::ColumnStatistics>& stats) const { |
| ColumnWriter::getStripeStatistics(stats); |
| |
| for (uint32_t i = 0; i < children.size(); ++i) { |
| children[i]->getStripeStatistics(stats); |
| } |
| } |
| |
| void StructColumnWriter::mergeStripeStatsIntoFileStats() { |
| ColumnWriter::mergeStripeStatsIntoFileStats(); |
| |
| for (uint32_t i = 0; i < children.size(); ++i) { |
| children[i]->mergeStripeStatsIntoFileStats(); |
| } |
| } |
| |
| void StructColumnWriter::getFileStatistics( |
| std::vector<proto::ColumnStatistics>& stats) const { |
| ColumnWriter::getFileStatistics(stats); |
| |
| for (uint32_t i = 0; i < children.size(); ++i) { |
| children[i]->getFileStatistics(stats); |
| } |
| } |
| |
| void StructColumnWriter::mergeRowGroupStatsIntoStripeStats() { |
| ColumnWriter::mergeRowGroupStatsIntoStripeStats(); |
| |
| for (uint32_t i = 0; i < children.size(); ++i) { |
| children[i]->mergeRowGroupStatsIntoStripeStats(); |
| } |
| } |
| |
| void StructColumnWriter::createRowIndexEntry() { |
| ColumnWriter::createRowIndexEntry(); |
| |
| for (uint32_t i = 0; i < children.size(); ++i) { |
| children[i]->createRowIndexEntry(); |
| } |
| } |
| |
| void StructColumnWriter::reset() { |
| ColumnWriter::reset(); |
| |
| for (uint32_t i = 0; i < children.size(); ++i) { |
| children[i]->reset(); |
| } |
| } |
| |
| void StructColumnWriter::writeDictionary() { |
| for (uint32_t i = 0; i < children.size(); ++i) { |
| children[i]->writeDictionary(); |
| } |
| } |
| |
| class IntegerColumnWriter : public ColumnWriter { |
| public: |
| IntegerColumnWriter( |
| const Type& type, |
| const StreamsFactory& factory, |
| const WriterOptions& options); |
| |
| virtual void add(ColumnVectorBatch& rowBatch, |
| uint64_t offset, |
| uint64_t numValues, |
| const char* incomingMask) override; |
| |
| virtual void flush(std::vector<proto::Stream>& streams) override; |
| |
| virtual uint64_t getEstimatedSize() const override; |
| |
| virtual void getColumnEncoding( |
| std::vector<proto::ColumnEncoding>& encodings) const override; |
| |
| virtual void recordPosition() const override; |
| |
| protected: |
| std::unique_ptr<RleEncoder> rleEncoder; |
| |
| private: |
| RleVersion rleVersion; |
| }; |
| |
| IntegerColumnWriter::IntegerColumnWriter( |
| const Type& type, |
| const StreamsFactory& factory, |
| const WriterOptions& options) : |
| ColumnWriter(type, factory, options), |
| rleVersion(options.getRleVersion()) { |
| std::unique_ptr<BufferedOutputStream> dataStream = |
| factory.createStream(proto::Stream_Kind_DATA); |
| rleEncoder = createRleEncoder( |
| std::move(dataStream), |
| true, |
| rleVersion, |
| memPool, |
| options.getAlignedBitpacking()); |
| |
| if (enableIndex) { |
| recordPosition(); |
| } |
| } |
| |
| void IntegerColumnWriter::add( |
| ColumnVectorBatch& rowBatch, |
| uint64_t offset, |
| uint64_t numValues, |
| const char* incomingMask) { |
| const LongVectorBatch* longBatch = |
| dynamic_cast<const LongVectorBatch*>(&rowBatch); |
| if (longBatch == nullptr) { |
| throw InvalidArgument("Failed to cast to LongVectorBatch"); |
| } |
| IntegerColumnStatisticsImpl* intStats = |
| dynamic_cast<IntegerColumnStatisticsImpl*>(colIndexStatistics.get()); |
| if (intStats == nullptr) { |
| throw InvalidArgument("Failed to cast to IntegerColumnStatisticsImpl"); |
| } |
| |
| ColumnWriter::add(rowBatch, offset, numValues, incomingMask); |
| |
| const int64_t* data = longBatch->data.data() + offset; |
| const char* notNull = longBatch->hasNulls ? |
| longBatch->notNull.data() + offset : nullptr; |
| |
| rleEncoder->add(data, numValues, notNull); |
| |
| // update stats |
| uint64_t count = 0; |
| for (uint64_t i = 0; i < numValues; ++i) { |
| if (notNull == nullptr || notNull[i]) { |
| ++count; |
| if (enableBloomFilter) { |
| bloomFilter->addLong(data[i]); |
| } |
| intStats->update(data[i], 1); |
| } |
| } |
| intStats->increase(count); |
| if (count < numValues) { |
| intStats->setHasNull(true); |
| } |
| } |
| |
| void IntegerColumnWriter::flush(std::vector<proto::Stream>& streams) { |
| ColumnWriter::flush(streams); |
| |
| proto::Stream stream; |
| stream.set_kind(proto::Stream_Kind_DATA); |
| stream.set_column(static_cast<uint32_t>(columnId)); |
| stream.set_length(rleEncoder->flush()); |
| streams.push_back(stream); |
| } |
| |
| uint64_t IntegerColumnWriter::getEstimatedSize() const { |
| uint64_t size = ColumnWriter::getEstimatedSize(); |
| size += rleEncoder->getBufferSize(); |
| return size; |
| } |
| |
| void IntegerColumnWriter::getColumnEncoding( |
| std::vector<proto::ColumnEncoding>& encodings) const { |
| proto::ColumnEncoding encoding; |
| encoding.set_kind(RleVersionMapper(rleVersion)); |
| encoding.set_dictionarysize(0); |
| if (enableBloomFilter) { |
| encoding.set_bloomencoding(BloomFilterVersion::UTF8); |
| } |
| encodings.push_back(encoding); |
| } |
| |
| void IntegerColumnWriter::recordPosition() const { |
| ColumnWriter::recordPosition(); |
| rleEncoder->recordPosition(rowIndexPosition.get()); |
| } |
| |
| class ByteColumnWriter : public ColumnWriter { |
| public: |
| ByteColumnWriter(const Type& type, |
| const StreamsFactory& factory, |
| const WriterOptions& options); |
| |
| virtual void add(ColumnVectorBatch& rowBatch, |
| uint64_t offset, |
| uint64_t numValues, |
| const char* incomingMask) override; |
| |
| virtual void flush(std::vector<proto::Stream>& streams) override; |
| |
| virtual uint64_t getEstimatedSize() const override; |
| |
| virtual void getColumnEncoding( |
| std::vector<proto::ColumnEncoding>& encodings) const override; |
| |
| virtual void recordPosition() const override; |
| |
| private: |
| std::unique_ptr<ByteRleEncoder> byteRleEncoder; |
| }; |
| |
| ByteColumnWriter::ByteColumnWriter( |
| const Type& type, |
| const StreamsFactory& factory, |
| const WriterOptions& options) : |
| ColumnWriter(type, factory, options) { |
| std::unique_ptr<BufferedOutputStream> dataStream = |
| factory.createStream(proto::Stream_Kind_DATA); |
| byteRleEncoder = createByteRleEncoder(std::move(dataStream)); |
| |
| if (enableIndex) { |
| recordPosition(); |
| } |
| } |
| |
| void ByteColumnWriter::add(ColumnVectorBatch& rowBatch, |
| uint64_t offset, |
| uint64_t numValues, |
| const char* incomingMask) { |
| LongVectorBatch* byteBatch = dynamic_cast<LongVectorBatch*>(&rowBatch); |
| if (byteBatch == nullptr) { |
| throw InvalidArgument("Failed to cast to LongVectorBatch"); |
| } |
| IntegerColumnStatisticsImpl* intStats = |
| dynamic_cast<IntegerColumnStatisticsImpl*>(colIndexStatistics.get()); |
| if (intStats == nullptr) { |
| throw InvalidArgument("Failed to cast to IntegerColumnStatisticsImpl"); |
| } |
| |
| ColumnWriter::add(rowBatch, offset, numValues, incomingMask); |
| |
| int64_t* data = byteBatch->data.data() + offset; |
| const char* notNull = byteBatch->hasNulls ? |
| byteBatch->notNull.data() + offset : nullptr; |
| |
| char* byteData = reinterpret_cast<char*>(data); |
| for (uint64_t i = 0; i < numValues; ++i) { |
| byteData[i] = static_cast<char>(data[i]); |
| } |
| byteRleEncoder->add(byteData, numValues, notNull); |
| |
| uint64_t count = 0; |
| for (uint64_t i = 0; i < numValues; ++i) { |
| if (notNull == nullptr || notNull[i]) { |
| ++count; |
| if (enableBloomFilter) { |
| bloomFilter->addLong(data[i]); |
| } |
| intStats->update(static_cast<int64_t>(byteData[i]), 1); |
| } |
| } |
| intStats->increase(count); |
| if (count < numValues) { |
| intStats->setHasNull(true); |
| } |
| } |
| |
| void ByteColumnWriter::flush(std::vector<proto::Stream>& streams) { |
| ColumnWriter::flush(streams); |
| |
| proto::Stream stream; |
| stream.set_kind(proto::Stream_Kind_DATA); |
| stream.set_column(static_cast<uint32_t>(columnId)); |
| stream.set_length(byteRleEncoder->flush()); |
| streams.push_back(stream); |
| } |
| |
| uint64_t ByteColumnWriter::getEstimatedSize() const { |
| uint64_t size = ColumnWriter::getEstimatedSize(); |
| size += byteRleEncoder->getBufferSize(); |
| return size; |
| } |
| |
| void ByteColumnWriter::getColumnEncoding( |
| std::vector<proto::ColumnEncoding>& encodings) const { |
| proto::ColumnEncoding encoding; |
| encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); |
| encoding.set_dictionarysize(0); |
| if (enableBloomFilter) { |
| encoding.set_bloomencoding(BloomFilterVersion::UTF8); |
| } |
| encodings.push_back(encoding); |
| } |
| |
| void ByteColumnWriter::recordPosition() const { |
| ColumnWriter::recordPosition(); |
| byteRleEncoder->recordPosition(rowIndexPosition.get()); |
| } |
| |
| class BooleanColumnWriter : public ColumnWriter { |
| public: |
| BooleanColumnWriter(const Type& type, |
| const StreamsFactory& factory, |
| const WriterOptions& options); |
| |
| virtual void add(ColumnVectorBatch& rowBatch, |
| uint64_t offset, |
| uint64_t numValues, |
| const char* incomingMask) override; |
| |
| virtual void flush(std::vector<proto::Stream>& streams) override; |
| |
| virtual uint64_t getEstimatedSize() const override; |
| |
| virtual void getColumnEncoding( |
| std::vector<proto::ColumnEncoding>& encodings) const override; |
| |
| virtual void recordPosition() const override; |
| |
| private: |
| std::unique_ptr<ByteRleEncoder> rleEncoder; |
| }; |
| |
| BooleanColumnWriter::BooleanColumnWriter( |
| const Type& type, |
| const StreamsFactory& factory, |
| const WriterOptions& options) : |
| ColumnWriter(type, factory, options) { |
| std::unique_ptr<BufferedOutputStream> dataStream = |
| factory.createStream(proto::Stream_Kind_DATA); |
| rleEncoder = createBooleanRleEncoder(std::move(dataStream)); |
| |
| if (enableIndex) { |
| recordPosition(); |
| } |
| } |
| |
| void BooleanColumnWriter::add(ColumnVectorBatch& rowBatch, |
| uint64_t offset, |
| uint64_t numValues, |
| const char* incomingMask) { |
| LongVectorBatch* byteBatch = dynamic_cast<LongVectorBatch*>(&rowBatch); |
| if (byteBatch == nullptr) { |
| throw InvalidArgument("Failed to cast to LongVectorBatch"); |
| } |
| BooleanColumnStatisticsImpl* boolStats = |
| dynamic_cast<BooleanColumnStatisticsImpl*>(colIndexStatistics.get()); |
| if (boolStats == nullptr) { |
| throw InvalidArgument("Failed to cast to BooleanColumnStatisticsImpl"); |
| } |
| |
| ColumnWriter::add(rowBatch, offset, numValues, incomingMask); |
| |
| int64_t* data = byteBatch->data.data() + offset; |
| const char* notNull = byteBatch->hasNulls ? |
| byteBatch->notNull.data() + offset : nullptr; |
| |
| char* byteData = reinterpret_cast<char*>(data); |
| for (uint64_t i = 0; i < numValues; ++i) { |
| byteData[i] = static_cast<char>(data[i]); |
| } |
| rleEncoder->add(byteData, numValues, notNull); |
| |
| uint64_t count = 0; |
| for (uint64_t i = 0; i < numValues; ++i) { |
| if (notNull == nullptr || notNull[i]) { |
| ++count; |
| if (enableBloomFilter) { |
| bloomFilter->addLong(data[i]); |
| } |
| boolStats->update(byteData[i] != 0, 1); |
| } |
| } |
| boolStats->increase(count); |
| if (count < numValues) { |
| boolStats->setHasNull(true); |
| } |
| } |
| |
| void BooleanColumnWriter::flush(std::vector<proto::Stream>& streams) { |
| ColumnWriter::flush(streams); |
| |
| proto::Stream stream; |
| stream.set_kind(proto::Stream_Kind_DATA); |
| stream.set_column(static_cast<uint32_t>(columnId)); |
| stream.set_length(rleEncoder->flush()); |
| streams.push_back(stream); |
| } |
| |
| uint64_t BooleanColumnWriter::getEstimatedSize() const { |
| uint64_t size = ColumnWriter::getEstimatedSize(); |
| size += rleEncoder->getBufferSize(); |
| return size; |
| } |
| |
| void BooleanColumnWriter::getColumnEncoding( |
| std::vector<proto::ColumnEncoding>& encodings) const { |
| proto::ColumnEncoding encoding; |
| encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); |
| encoding.set_dictionarysize(0); |
| if (enableBloomFilter) { |
| encoding.set_bloomencoding(BloomFilterVersion::UTF8); |
| } |
| encodings.push_back(encoding); |
| } |
| |
| void BooleanColumnWriter::recordPosition() const { |
| ColumnWriter::recordPosition(); |
| rleEncoder->recordPosition(rowIndexPosition.get()); |
| } |
| |
| class DoubleColumnWriter : public ColumnWriter { |
| public: |
| DoubleColumnWriter(const Type& type, |
| const StreamsFactory& factory, |
| const WriterOptions& options, |
| bool isFloat); |
| |
| virtual void add(ColumnVectorBatch& rowBatch, |
| uint64_t offset, |
| uint64_t numValues, |
| const char* incomingMask) override; |
| |
| virtual void flush(std::vector<proto::Stream>& streams) override; |
| |
| virtual uint64_t getEstimatedSize() const override; |
| |
| virtual void getColumnEncoding( |
| std::vector<proto::ColumnEncoding>& encodings) const override; |
| |
| virtual void recordPosition() const override; |
| |
| private: |
| bool isFloat; |
| std::unique_ptr<AppendOnlyBufferedStream> dataStream; |
| DataBuffer<char> buffer; |
| }; |
| |
| DoubleColumnWriter::DoubleColumnWriter( |
| const Type& type, |
| const StreamsFactory& factory, |
| const WriterOptions& options, |
| bool isFloatType) : |
| ColumnWriter(type, factory, options), |
| isFloat(isFloatType), |
| buffer(*options.getMemoryPool()) { |
| dataStream.reset(new AppendOnlyBufferedStream( |
| factory.createStream(proto::Stream_Kind_DATA))); |
| buffer.resize(isFloat ? 4 : 8); |
| |
| if (enableIndex) { |
| recordPosition(); |
| } |
| } |
| |
| // Floating point types are stored using IEEE 754 floating point bit layout. |
| // Float columns use 4 bytes per value and double columns use 8 bytes. |
| template <typename FLOAT_TYPE, typename INTEGER_TYPE> |
| inline void encodeFloatNum(FLOAT_TYPE input, char* output) { |
| INTEGER_TYPE* intBits = reinterpret_cast<INTEGER_TYPE*>(&input); |
| for (size_t i = 0; i < sizeof(INTEGER_TYPE); ++i) { |
| output[i] = static_cast<char>(((*intBits) >> (8 * i)) & 0xff); |
| } |
| } |
| |
| void DoubleColumnWriter::add(ColumnVectorBatch& rowBatch, |
| uint64_t offset, |
| uint64_t numValues, |
| const char* incomingMask) { |
| const DoubleVectorBatch* dblBatch = |
| dynamic_cast<const DoubleVectorBatch*>(&rowBatch); |
| if (dblBatch == nullptr) { |
| throw InvalidArgument("Failed to cast to DoubleVectorBatch"); |
| } |
| DoubleColumnStatisticsImpl* doubleStats = |
| dynamic_cast<DoubleColumnStatisticsImpl*>(colIndexStatistics.get()); |
| if (doubleStats == nullptr) { |
| throw InvalidArgument("Failed to cast to DoubleColumnStatisticsImpl"); |
| } |
| |
| ColumnWriter::add(rowBatch, offset, numValues, incomingMask); |
| |
| const double* doubleData = dblBatch->data.data() + offset; |
| const char* notNull = dblBatch->hasNulls ? |
| dblBatch->notNull.data() + offset : nullptr; |
| |
| size_t bytes = isFloat ? 4 : 8; |
| char* data = buffer.data(); |
| uint64_t count = 0; |
| for (uint64_t i = 0; i < numValues; ++i) { |
| if (!notNull || notNull[i]) { |
| if (isFloat) { |
| encodeFloatNum<float, int32_t>(static_cast<float>(doubleData[i]), data); |
| } else { |
| encodeFloatNum<double, int64_t>(doubleData[i], data); |
| } |
| dataStream->write(data, bytes); |
| ++count; |
| if (enableBloomFilter) { |
| bloomFilter->addDouble(doubleData[i]); |
| } |
| doubleStats->update(doubleData[i]); |
| } |
| } |
| doubleStats->increase(count); |
| if (count < numValues) { |
| doubleStats->setHasNull(true); |
| } |
| } |
| |
| void DoubleColumnWriter::flush(std::vector<proto::Stream>& streams) { |
| ColumnWriter::flush(streams); |
| |
| proto::Stream stream; |
| stream.set_kind(proto::Stream_Kind_DATA); |
| stream.set_column(static_cast<uint32_t>(columnId)); |
| stream.set_length(dataStream->flush()); |
| streams.push_back(stream); |
| } |
| |
| uint64_t DoubleColumnWriter::getEstimatedSize() const { |
| uint64_t size = ColumnWriter::getEstimatedSize(); |
| size += dataStream->getSize(); |
| return size; |
| } |
| |
| void DoubleColumnWriter::getColumnEncoding( |
| std::vector<proto::ColumnEncoding>& encodings) const { |
| proto::ColumnEncoding encoding; |
| encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); |
| encoding.set_dictionarysize(0); |
| if (enableBloomFilter) { |
| encoding.set_bloomencoding(BloomFilterVersion::UTF8); |
| } |
| encodings.push_back(encoding); |
| } |
| |
| void DoubleColumnWriter::recordPosition() const { |
| ColumnWriter::recordPosition(); |
| dataStream->recordPosition(rowIndexPosition.get()); |
| } |
| |
| /** |
| * Implementation of increasing sorted string dictionary |
| */ |
| class SortedStringDictionary { |
| public: |
| struct DictEntry { |
| DictEntry(const char * str, size_t len):data(str),length(len) {} |
| const char * data; |
| size_t length; |
| }; |
| |
| SortedStringDictionary():totalLength(0) {} |
| |
| // insert a new string into dictionary, return its insertion order |
| size_t insert(const char * data, size_t len); |
| |
| // write dictionary data & length to output buffer |
| void flush(AppendOnlyBufferedStream * dataStream, |
| RleEncoder * lengthEncoder) const; |
| |
| // reorder input index buffer from insertion order to dictionary order |
| void reorder(std::vector<int64_t>& idxBuffer) const; |
| |
| // get dict entries in insertion order |
| void getEntriesInInsertionOrder(std::vector<const DictEntry *>&) const; |
| |
| // return count of entries |
| size_t size() const; |
| |
| // return total length of strings in the dictioanry |
| uint64_t length() const; |
| |
| void clear(); |
| |
| private: |
| struct LessThan { |
| bool operator()(const DictEntry& left, const DictEntry& right) const { |
| int ret = memcmp(left.data, right.data, std::min(left.length, right.length)); |
| if (ret != 0) { |
| return ret < 0; |
| } |
| return left.length < right.length; |
| } |
| }; |
| |
| std::map<DictEntry, size_t, LessThan> dict; |
| std::vector<std::vector<char>> data; |
| uint64_t totalLength; |
| |
| // use friend class here to avoid being bothered by const function calls |
| friend class StringColumnWriter; |
| friend class CharColumnWriter; |
| friend class VarCharColumnWriter; |
| // store indexes of insertion order in the dictionary for not-null rows |
| std::vector<int64_t> idxInDictBuffer; |
| }; |
| |
| // insert a new string into dictionary, return its insertion order |
| size_t SortedStringDictionary::insert(const char * str, size_t len) { |
| auto ret = dict.insert({DictEntry(str, len), dict.size()}); |
| if (ret.second) { |
| // make a copy to internal storage |
| data.push_back(std::vector<char>(len)); |
| memcpy(data.back().data(), str, len); |
| // update dictionary entry to link pointer to internal storage |
| DictEntry * entry = const_cast<DictEntry *>(&(ret.first->first)); |
| entry->data = data.back().data(); |
| totalLength += len; |
| } |
| return ret.first->second; |
| } |
| |
| // write dictionary data & length to output buffer |
| void SortedStringDictionary::flush(AppendOnlyBufferedStream * dataStream, |
| RleEncoder * lengthEncoder) const { |
| for (auto it = dict.cbegin(); it != dict.cend(); ++it) { |
| dataStream->write(it->first.data, it->first.length); |
| lengthEncoder->write(static_cast<int64_t>(it->first.length)); |
| } |
| } |
| |
| /** |
| * Reorder input index buffer from insertion order to dictionary order |
| * |
| * We require this function because string values are buffered by indexes |
| * in their insertion order. Until the entire dictionary is complete can |
| * we get their sorted indexes in the dictionary in that ORC specification |
| * demands dictionary should be ordered. Therefore this function transforms |
| * the indexes from insertion order to dictionary value order for final |
| * output. |
| */ |
| void SortedStringDictionary::reorder(std::vector<int64_t>& idxBuffer) const { |
| // iterate the dictionary to get mapping from insertion order to value order |
| std::vector<size_t> mapping(dict.size()); |
| size_t dictIdx = 0; |
| for (auto it = dict.cbegin(); it != dict.cend(); ++it) { |
| mapping[it->second] = dictIdx++; |
| } |
| |
| // do the transformation |
| for (size_t i = 0; i != idxBuffer.size(); ++i) { |
| idxBuffer[i] = static_cast<int64_t>( |
| mapping[static_cast<size_t>(idxBuffer[i])]); |
| } |
| } |
| |
| // get dict entries in insertion order |
| void SortedStringDictionary::getEntriesInInsertionOrder( |
| std::vector<const DictEntry *>& entries) const { |
| entries.resize(dict.size()); |
| for (auto it = dict.cbegin(); it != dict.cend(); ++it) { |
| entries[it->second] = &(it->first); |
| } |
| } |
| |
| // return count of entries |
| size_t SortedStringDictionary::size() const { |
| return dict.size(); |
| } |
| |
| // return total length of strings in the dictioanry |
| uint64_t SortedStringDictionary::length() const { |
| return totalLength; |
| } |
| |
| void SortedStringDictionary::clear() { |
| totalLength = 0; |
| data.clear(); |
| dict.clear(); |
| } |
| |
| class StringColumnWriter : public ColumnWriter { |
| public: |
| StringColumnWriter(const Type& type, |
| const StreamsFactory& factory, |
| const WriterOptions& options); |
| |
| virtual void add(ColumnVectorBatch& rowBatch, |
| uint64_t offset, |
| uint64_t numValues, |
| const char* incomingMask) override; |
| |
| virtual void flush(std::vector<proto::Stream>& streams) override; |
| |
| virtual uint64_t getEstimatedSize() const override; |
| |
| virtual void getColumnEncoding( |
| std::vector<proto::ColumnEncoding>& encodings) const override; |
| |
| virtual void recordPosition() const override; |
| |
| virtual void createRowIndexEntry() override; |
| |
| virtual void writeDictionary() override; |
| |
| virtual void reset() override; |
| |
| private: |
| /** |
| * dictionary related functions |
| */ |
| bool checkDictionaryKeyRatio(); |
| void createDirectStreams(); |
| void createDictStreams(); |
| void deleteDictStreams(); |
| void fallbackToDirectEncoding(); |
| |
| protected: |
| RleVersion rleVersion; |
| bool useCompression; |
| const StreamsFactory& streamsFactory; |
| bool alignedBitPacking; |
| |
| // direct encoding streams |
| std::unique_ptr<RleEncoder> directLengthEncoder; |
| std::unique_ptr<AppendOnlyBufferedStream> directDataStream; |
| |
| // dictionary encoding streams |
| std::unique_ptr<RleEncoder> dictDataEncoder; |
| std::unique_ptr<RleEncoder> dictLengthEncoder; |
| std::unique_ptr<AppendOnlyBufferedStream> dictStream; |
| |
| /** |
| * dictionary related variables |
| */ |
| SortedStringDictionary dictionary; |
| // whether or not dictionary checking is done |
| bool doneDictionaryCheck; |
| // whether or not it should be used |
| bool useDictionary; |
| // keys in the dictionary should not exceed this ratio |
| double dictSizeThreshold; |
| |
| // record start row of each row group; null rows are skipped |
| mutable std::vector<size_t> startOfRowGroups; |
| }; |
| |
| StringColumnWriter::StringColumnWriter( |
| const Type& type, |
| const StreamsFactory& factory, |
| const WriterOptions& options) : |
| ColumnWriter(type, factory, options), |
| rleVersion(options.getRleVersion()), |
| useCompression(options.getCompression() != CompressionKind_NONE), |
| streamsFactory(factory), |
| alignedBitPacking(options.getAlignedBitpacking()), |
| doneDictionaryCheck(false), |
| useDictionary(options.getEnableDictionary()), |
| dictSizeThreshold(options.getDictionaryKeySizeThreshold()){ |
| if (type.getKind() == TypeKind::BINARY) { |
| useDictionary = false; |
| doneDictionaryCheck = true; |
| } |
| |
| if (useDictionary) { |
| createDictStreams(); |
| } else { |
| doneDictionaryCheck = true; |
| createDirectStreams(); |
| } |
| |
| if (enableIndex) { |
| recordPosition(); |
| } |
| } |
| |
| void StringColumnWriter::add(ColumnVectorBatch& rowBatch, |
| uint64_t offset, |
| uint64_t numValues, |
| const char* incomingMask) { |
| const StringVectorBatch* stringBatch = |
| dynamic_cast<const StringVectorBatch*>(&rowBatch); |
| if (stringBatch == nullptr) { |
| throw InvalidArgument("Failed to cast to StringVectorBatch"); |
| } |
| |
| StringColumnStatisticsImpl* strStats = |
| dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get()); |
| if (strStats == nullptr) { |
| throw InvalidArgument("Failed to cast to StringColumnStatisticsImpl"); |
| } |
| |
| ColumnWriter::add(rowBatch, offset, numValues, incomingMask); |
| |
| char *const * data = stringBatch->data.data() + offset; |
| const int64_t* length = stringBatch->length.data() + offset; |
| const char* notNull = stringBatch->hasNulls ? |
| stringBatch->notNull.data() + offset : nullptr; |
| |
| if (!useDictionary){ |
| directLengthEncoder->add(length, numValues, notNull); |
| } |
| |
| uint64_t count = 0; |
| for (uint64_t i = 0; i < numValues; ++i) { |
| if (!notNull || notNull[i]) { |
| const size_t len = static_cast<size_t>(length[i]); |
| if (useDictionary) { |
| size_t index = dictionary.insert(data[i], len); |
| dictionary.idxInDictBuffer.push_back(static_cast<int64_t>(index)); |
| } else { |
| directDataStream->write(data[i], len); |
| } |
| if (enableBloomFilter) { |
| bloomFilter->addBytes(data[i], static_cast<int64_t>(len)); |
| } |
| strStats->update(data[i], len); |
| ++count; |
| } |
| } |
| strStats->increase(count); |
| if (count < numValues) { |
| strStats->setHasNull(true); |
| } |
| } |
| |
| void StringColumnWriter::flush(std::vector<proto::Stream>& streams) { |
| ColumnWriter::flush(streams); |
| |
| if (useDictionary) { |
| proto::Stream data; |
| data.set_kind(proto::Stream_Kind_DATA); |
| data.set_column(static_cast<uint32_t>(columnId)); |
| data.set_length(dictDataEncoder->flush()); |
| streams.push_back(data); |
| |
| proto::Stream dict; |
| dict.set_kind(proto::Stream_Kind_DICTIONARY_DATA); |
| dict.set_column(static_cast<uint32_t>(columnId)); |
| dict.set_length(dictStream->flush()); |
| streams.push_back(dict); |
| |
| proto::Stream length; |
| length.set_kind(proto::Stream_Kind_LENGTH); |
| length.set_column(static_cast<uint32_t>(columnId)); |
| length.set_length(dictLengthEncoder->flush()); |
| streams.push_back(length); |
| } else { |
| proto::Stream length; |
| length.set_kind(proto::Stream_Kind_LENGTH); |
| length.set_column(static_cast<uint32_t>(columnId)); |
| length.set_length(directLengthEncoder->flush()); |
| streams.push_back(length); |
| |
| proto::Stream data; |
| data.set_kind(proto::Stream_Kind_DATA); |
| data.set_column(static_cast<uint32_t>(columnId)); |
| data.set_length(directDataStream->flush()); |
| streams.push_back(data); |
| } |
| } |
| |
| uint64_t StringColumnWriter::getEstimatedSize() const { |
| uint64_t size = ColumnWriter::getEstimatedSize(); |
| if (!useDictionary) { |
| size += directLengthEncoder->getBufferSize(); |
| size += directDataStream->getSize(); |
| } else { |
| size += dictionary.length(); |
| size += dictionary.size() * sizeof(int32_t); |
| size += dictionary.idxInDictBuffer.size() * sizeof(int32_t); |
| if (useCompression) { |
| size /= 3; // estimated ratio is 3:1 |
| } |
| } |
| return size; |
| } |
| |
| void StringColumnWriter::getColumnEncoding( |
| std::vector<proto::ColumnEncoding>& encodings) const { |
| proto::ColumnEncoding encoding; |
| if (!useDictionary) { |
| encoding.set_kind(rleVersion == RleVersion_1 ? |
| proto::ColumnEncoding_Kind_DIRECT : |
| proto::ColumnEncoding_Kind_DIRECT_V2); |
| } else { |
| encoding.set_kind(rleVersion == RleVersion_1 ? |
| proto::ColumnEncoding_Kind_DICTIONARY : |
| proto::ColumnEncoding_Kind_DICTIONARY_V2); |
| } |
| encoding.set_dictionarysize(static_cast<uint32_t>(dictionary.size())); |
| if (enableBloomFilter) { |
| encoding.set_bloomencoding(BloomFilterVersion::UTF8); |
| } |
| encodings.push_back(encoding); |
| } |
| |
| void StringColumnWriter::recordPosition() const { |
| ColumnWriter::recordPosition(); |
| if (!useDictionary) { |
| directDataStream->recordPosition(rowIndexPosition.get()); |
| directLengthEncoder->recordPosition(rowIndexPosition.get()); |
| } else { |
| if (enableIndex) { |
| startOfRowGroups.push_back(dictionary.idxInDictBuffer.size()); |
| } |
| } |
| } |
| |
| bool StringColumnWriter::checkDictionaryKeyRatio() { |
| if (!doneDictionaryCheck) { |
| useDictionary = dictionary.size() <= static_cast<size_t>( |
| static_cast<double>(dictionary.idxInDictBuffer.size()) * dictSizeThreshold); |
| doneDictionaryCheck = true; |
| } |
| |
| return useDictionary; |
| } |
| |
| void StringColumnWriter::createRowIndexEntry() { |
| if (useDictionary && !doneDictionaryCheck) { |
| if (!checkDictionaryKeyRatio()) { |
| fallbackToDirectEncoding(); |
| } |
| } |
| ColumnWriter::createRowIndexEntry(); |
| } |
| |
| void StringColumnWriter::reset() { |
| ColumnWriter::reset(); |
| |
| dictionary.clear(); |
| dictionary.idxInDictBuffer.resize(0); |
| startOfRowGroups.clear(); |
| startOfRowGroups.push_back(0); |
| } |
| |
| void StringColumnWriter::createDirectStreams() { |
| std::unique_ptr<BufferedOutputStream> directLengthStream = |
| streamsFactory.createStream(proto::Stream_Kind_LENGTH); |
| directLengthEncoder = createRleEncoder(std::move(directLengthStream), |
| false, |
| rleVersion, |
| memPool, |
| alignedBitPacking); |
| directDataStream.reset(new AppendOnlyBufferedStream( |
| streamsFactory.createStream(proto::Stream_Kind_DATA))); |
| } |
| |
| void StringColumnWriter::createDictStreams() { |
| std::unique_ptr<BufferedOutputStream> dictDataStream = |
| streamsFactory.createStream(proto::Stream_Kind_DATA); |
| dictDataEncoder = createRleEncoder(std::move(dictDataStream), |
| false, |
| rleVersion, |
| memPool, |
| alignedBitPacking); |
| std::unique_ptr<BufferedOutputStream> dictLengthStream = |
| streamsFactory.createStream(proto::Stream_Kind_LENGTH); |
| dictLengthEncoder = createRleEncoder(std::move(dictLengthStream), |
| false, |
| rleVersion, |
| memPool, |
| alignedBitPacking); |
| dictStream.reset(new AppendOnlyBufferedStream( |
| streamsFactory.createStream(proto::Stream_Kind_DICTIONARY_DATA))); |
| } |
| |
| void StringColumnWriter::deleteDictStreams() { |
| dictDataEncoder.reset(nullptr); |
| dictLengthEncoder.reset(nullptr); |
| dictStream.reset(nullptr); |
| |
| dictionary.clear(); |
| dictionary.idxInDictBuffer.clear(); |
| startOfRowGroups.clear(); |
| } |
| |
| void StringColumnWriter::writeDictionary() { |
| if (useDictionary && !doneDictionaryCheck) { |
| // when index is disabled, dictionary check happens while writing 1st stripe |
| if (!checkDictionaryKeyRatio()) { |
| fallbackToDirectEncoding(); |
| return; |
| } |
| } |
| |
| if (useDictionary) { |
| // flush dictionary data & length streams |
| dictionary.flush(dictStream.get(), dictLengthEncoder.get()); |
| |
| // convert index from insertion order to dictionary order |
| dictionary.reorder(dictionary.idxInDictBuffer); |
| |
| // write data sequences |
| int64_t * data = dictionary.idxInDictBuffer.data(); |
| if (enableIndex) { |
| size_t prevOffset = 0; |
| for (size_t i = 0; i < startOfRowGroups.size(); ++i) { |
| // write sequences in batch for a row group stride |
| size_t offset = startOfRowGroups[i]; |
| dictDataEncoder->add(data + prevOffset, offset - prevOffset, nullptr); |
| |
| // update index positions |
| int rowGroupId = static_cast<int>(i); |
| proto::RowIndexEntry* indexEntry = |
| (rowGroupId < rowIndex->entry_size()) ? |
| rowIndex->mutable_entry(rowGroupId) : rowIndexEntry.get(); |
| |
| // add positions for direct streams |
| RowIndexPositionRecorder recorder(*indexEntry); |
| dictDataEncoder->recordPosition(&recorder); |
| |
| prevOffset = offset; |
| } |
| |
| dictDataEncoder->add(data + prevOffset, |
| dictionary.idxInDictBuffer.size() - prevOffset, |
| nullptr); |
| } else { |
| dictDataEncoder->add(data, dictionary.idxInDictBuffer.size(), nullptr); |
| } |
| } |
| } |
| |
| void StringColumnWriter::fallbackToDirectEncoding() { |
| createDirectStreams(); |
| |
| if (enableIndex) { |
| // fallback happens at the 1st row group; |
| // simply complete positions for direct streams |
| proto::RowIndexEntry * indexEntry = rowIndexEntry.get(); |
| RowIndexPositionRecorder recorder(*indexEntry); |
| directDataStream->recordPosition(&recorder); |
| directLengthEncoder->recordPosition(&recorder); |
| } |
| |
| // get dictionary entries in insertion order |
| std::vector<const SortedStringDictionary::DictEntry *> entries; |
| dictionary.getEntriesInInsertionOrder(entries); |
| |
| // store each length of the data into a vector |
| const SortedStringDictionary::DictEntry * dictEntry = nullptr; |
| for (uint64_t i = 0; i != dictionary.idxInDictBuffer.size(); ++i) { |
| // write one row data in direct encoding |
| dictEntry = entries[static_cast<size_t>(dictionary.idxInDictBuffer[i])]; |
| directDataStream->write(dictEntry->data, dictEntry->length); |
| directLengthEncoder->write(static_cast<int64_t>(dictEntry->length)); |
| } |
| |
| deleteDictStreams(); |
| } |
| |
| struct Utf8Utils { |
| /** |
| * Counts how many utf-8 chars of the input data |
| */ |
| static uint64_t charLength(const char * data, uint64_t length) { |
| uint64_t chars = 0; |
| for (uint64_t i = 0; i < length; i++) { |
| if (isUtfStartByte(data[i])) { |
| chars++; |
| } |
| } |
| return chars; |
| } |
| |
| /** |
| * Return the number of bytes required to read at most maxCharLength |
| * characters in full from a utf-8 encoded byte array provided |
| * by data. This does not validate utf-8 data, but |
| * operates correctly on already valid utf-8 data. |
| * |
| * @param maxCharLength number of characters required |
| * @param data the bytes of UTF-8 |
| * @param length the length of data to truncate |
| */ |
| static uint64_t truncateBytesTo(uint64_t maxCharLength, |
| const char * data, |
| uint64_t length) { |
| uint64_t chars = 0; |
| if (length <= maxCharLength) { |
| return length; |
| } |
| for (uint64_t i = 0; i < length; i++) { |
| if (isUtfStartByte(data[i])) { |
| chars++; |
| } |
| if (chars > maxCharLength) { |
| return i; |
| } |
| } |
| // everything fits |
| return length; |
| } |
| |
| /** |
| * Checks if b is the first byte of a UTF-8 character. |
| */ |
| inline static bool isUtfStartByte(char b) { |
| return (b & 0xC0) != 0x80; |
| } |
| |
| /** |
| * Find the start of the last character that ends in the current string. |
| * @param text the bytes of the utf-8 |
| * @param from the first byte location |
| * @param until the last byte location |
| * @return the index of the last character |
| */ |
| static uint64_t findLastCharacter(const char * text, uint64_t from, uint64_t until) { |
| uint64_t posn = until; |
| /* we don't expect characters more than 5 bytes */ |
| while (posn >= from) { |
| if (isUtfStartByte(text[posn])) { |
| return posn; |
| } |
| posn -= 1; |
| } |
| /* beginning of a valid char not found */ |
| throw std::logic_error( |
| "Could not truncate string, beginning of a valid char not found"); |
| } |
| }; |
| |
| class CharColumnWriter : public StringColumnWriter { |
| public: |
| CharColumnWriter(const Type& type, |
| const StreamsFactory& factory, |
| const WriterOptions& options) : |
| StringColumnWriter(type, factory, options), |
| maxLength(type.getMaximumLength()), |
| padBuffer(*options.getMemoryPool()) { |
| // utf-8 is currently 4 bytes long, but it could be up to 6 |
| padBuffer.resize(maxLength * 6); |
| } |
| |
| virtual void add(ColumnVectorBatch& rowBatch, |
| uint64_t offset, |
| uint64_t numValues, |
| const char* incomingMask) override; |
| |
| private: |
| uint64_t maxLength; |
| DataBuffer<char> padBuffer; |
| }; |
| |
| void CharColumnWriter::add(ColumnVectorBatch& rowBatch, |
| uint64_t offset, |
| uint64_t numValues, |
| const char* incomingMask) { |
| StringVectorBatch* charsBatch = dynamic_cast<StringVectorBatch*>(&rowBatch); |
| if (charsBatch == nullptr) { |
| throw InvalidArgument("Failed to cast to StringVectorBatch"); |
| } |
| |
| StringColumnStatisticsImpl* strStats = |
| dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get()); |
| if (strStats == nullptr) { |
| throw InvalidArgument("Failed to cast to StringColumnStatisticsImpl"); |
| } |
| |
| ColumnWriter::add(rowBatch, offset, numValues, incomingMask); |
| |
| char** data = charsBatch->data.data() + offset; |
| int64_t* length = charsBatch->length.data() + offset; |
| const char* notNull = charsBatch->hasNulls ? |
| charsBatch->notNull.data() + offset : nullptr; |
| |
| uint64_t count = 0; |
| for (uint64_t i = 0; i < numValues; ++i) { |
| if (!notNull || notNull[i]) { |
| const char * charData = nullptr; |
| uint64_t originLength = static_cast<uint64_t>(length[i]); |
| uint64_t charLength = Utf8Utils::charLength(data[i], originLength); |
| if (charLength >= maxLength) { |
| charData = data[i]; |
| length[i] = static_cast<int64_t>( |
| Utf8Utils::truncateBytesTo(maxLength, data[i], originLength)); |
| } else { |
| charData = padBuffer.data(); |
| // the padding is exactly 1 byte per char |
| length[i] = length[i] + static_cast<int64_t>(maxLength - charLength); |
| memcpy(padBuffer.data(), data[i], originLength); |
| memset(padBuffer.data() + originLength, |
| ' ', |
| static_cast<size_t>(length[i]) - originLength); |
| } |
| |
| if (useDictionary) { |
| size_t index = dictionary.insert(charData, static_cast<size_t>(length[i])); |
| dictionary.idxInDictBuffer.push_back(static_cast<int64_t>(index)); |
| } else { |
| directDataStream->write(charData, static_cast<size_t>(length[i])); |
| } |
| |
| if (enableBloomFilter) { |
| bloomFilter->addBytes(data[i], length[i]); |
| } |
| strStats->update(charData, static_cast<size_t>(length[i])); |
| ++count; |
| } |
| } |
| |
| if (!useDictionary) { |
| directLengthEncoder->add(length, numValues, notNull); |
| } |
| |
| strStats->increase(count); |
| if (count < numValues) { |
| strStats->setHasNull(true); |
| } |
| } |
| |
| class VarCharColumnWriter : public StringColumnWriter { |
| public: |
| VarCharColumnWriter(const Type& type, |
| const StreamsFactory& factory, |
| const WriterOptions& options) : |
| StringColumnWriter(type, factory, options), |
| maxLength(type.getMaximumLength()) { |
| // PASS |
| } |
| |
| virtual void add(ColumnVectorBatch& rowBatch, |
| uint64_t offset, |
| uint64_t numValues, |
| const char* incomingMask) override; |
| |
| private: |
| uint64_t maxLength; |
| }; |
| |
| void VarCharColumnWriter::add(ColumnVectorBatch& rowBatch, |
| uint64_t offset, |
| uint64_t numValues, |
| const char* incomingMask) { |
| StringVectorBatch* charsBatch = dynamic_cast<StringVectorBatch*>(&rowBatch); |
| if (charsBatch == nullptr) { |
| throw InvalidArgument("Failed to cast to StringVectorBatch"); |
| } |
| |
| StringColumnStatisticsImpl* strStats = |
| dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get()); |
| if (strStats == nullptr) { |
| throw InvalidArgument("Failed to cast to StringColumnStatisticsImpl"); |
| } |
| |
| ColumnWriter::add(rowBatch, offset, numValues, incomingMask); |
| |
| char* const* data = charsBatch->data.data() + offset; |
| int64_t* length = charsBatch->length.data() + offset; |
| const char* notNull = charsBatch->hasNulls ? |
| charsBatch->notNull.data() + offset : nullptr; |
| |
| uint64_t count = 0; |
| for (uint64_t i = 0; i < numValues; ++i) { |
| if (!notNull || notNull[i]) { |
| uint64_t itemLength = Utf8Utils::truncateBytesTo( |
| maxLength, data[i], static_cast<uint64_t>(length[i])); |
| length[i] = static_cast<int64_t>(itemLength); |
| |
| if (useDictionary) { |
| size_t index = dictionary.insert(data[i], static_cast<size_t>(length[i])); |
| dictionary.idxInDictBuffer.push_back(static_cast<int64_t>(index)); |
| } else { |
| directDataStream->write(data[i], static_cast<size_t>(length[i])); |
| } |
| |
| if (enableBloomFilter) { |
| bloomFilter->addBytes(data[i], length[i]); |
| } |
| strStats->update(data[i], static_cast<size_t>(length[i])); |
| ++count; |
| } |
| } |
| |
| if (!useDictionary) { |
| directLengthEncoder->add(length, numValues, notNull); |
| } |
| |
| strStats->increase(count); |
| if (count < numValues) { |
| strStats->setHasNull(true); |
| } |
| } |
| |
| class BinaryColumnWriter : public StringColumnWriter { |
| public: |
| BinaryColumnWriter(const Type& type, |
| const StreamsFactory& factory, |
| const WriterOptions& options) : |
| StringColumnWriter(type, factory, options) { |
| // PASS |
| } |
| |
| virtual void add(ColumnVectorBatch& rowBatch, |
| uint64_t offset, |
| uint64_t numValues, |
| const char* incomingMask) override; |
| }; |
| |
| void BinaryColumnWriter::add(ColumnVectorBatch& rowBatch, |
| uint64_t offset, |
| uint64_t numValues, |
| const char* incomingMask) { |
| StringVectorBatch* binBatch = dynamic_cast<StringVectorBatch*>(&rowBatch); |
| if (binBatch == nullptr) { |
| throw InvalidArgument("Failed to cast to StringVectorBatch"); |
| } |
| |
| BinaryColumnStatisticsImpl* binStats = |
| dynamic_cast<BinaryColumnStatisticsImpl*>(colIndexStatistics.get()); |
| if (binStats == nullptr) { |
| throw InvalidArgument("Failed to cast to BinaryColumnStatisticsImpl"); |
| } |
| |
| ColumnWriter::add(rowBatch, offset, numValues, incomingMask); |
| |
| char** data = binBatch->data.data() + offset; |
| int64_t* length = binBatch->length.data() + offset; |
| const char* notNull = binBatch->hasNulls ? |
| binBatch->notNull.data() + offset : nullptr; |
| |
| uint64_t count = 0; |
| for (uint64_t i = 0; i < numValues; ++i) { |
| uint64_t unsignedLength = static_cast<uint64_t>(length[i]); |
| if (!notNull || notNull[i]) { |
| directDataStream->write(data[i], unsignedLength); |
| |
| binStats->update(unsignedLength); |
| ++count; |
| } |
| } |
| directLengthEncoder->add(length, numValues, notNull); |
| binStats->increase(count); |
| if (count < numValues) { |
| binStats->setHasNull(true); |
| } |
| } |
| |
| class TimestampColumnWriter : public ColumnWriter { |
| public: |
| TimestampColumnWriter(const Type& type, |
| const StreamsFactory& factory, |
| const WriterOptions& options); |
| |
| virtual void add(ColumnVectorBatch& rowBatch, |
| uint64_t offset, |
| uint64_t numValues, |
| const char* incomingMask) override; |
| |
| virtual void flush(std::vector<proto::Stream>& streams) override; |
| |
| virtual uint64_t getEstimatedSize() const override; |
| |
| virtual void getColumnEncoding( |
| std::vector<proto::ColumnEncoding>& encodings) const override; |
| |
| virtual void recordPosition() const override; |
| |
| protected: |
| std::unique_ptr<RleEncoder> secRleEncoder, nanoRleEncoder; |
| |
| private: |
| RleVersion rleVersion; |
| const Timezone& timezone; |
| }; |
| |
| TimestampColumnWriter::TimestampColumnWriter( |
| const Type& type, |
| const StreamsFactory& factory, |
| const WriterOptions& options) : |
| ColumnWriter(type, factory, options), |
| rleVersion(options.getRleVersion()), |
| timezone(getTimezoneByName("GMT")){ |
| std::unique_ptr<BufferedOutputStream> dataStream = |
| factory.createStream(proto::Stream_Kind_DATA); |
| std::unique_ptr<BufferedOutputStream> secondaryStream = |
| factory.createStream(proto::Stream_Kind_SECONDARY); |
| secRleEncoder = createRleEncoder(std::move(dataStream), |
| true, |
| rleVersion, |
| memPool, |
| options.getAlignedBitpacking()); |
| nanoRleEncoder = createRleEncoder(std::move(secondaryStream), |
| false, |
| rleVersion, |
| memPool, |
| options.getAlignedBitpacking()); |
| |
| if (enableIndex) { |
| recordPosition(); |
| } |
| } |
| |
| // Because the number of nanoseconds often has a large number of trailing zeros, |
| // the number has trailing decimal zero digits removed and the last three bits |
| // are used to record how many zeros were removed if the trailing zeros are |
| // more than 2. Thus 1000 nanoseconds would be serialized as 0x0a and |
| // 100000 would be serialized as 0x0c. |
| static int64_t formatNano(int64_t nanos) { |
| if (nanos == 0) { |
| return 0; |
| } else if (nanos % 100 != 0) { |
| return (nanos) << 3; |
| } else { |
| nanos /= 100; |
| int64_t trailingZeros = 1; |
| while (nanos % 10 == 0 && trailingZeros < 7) { |
| nanos /= 10; |
| trailingZeros += 1; |
| } |
| return (nanos) << 3 | trailingZeros; |
| } |
| } |
| |
| void TimestampColumnWriter::add(ColumnVectorBatch& rowBatch, |
| uint64_t offset, |
| uint64_t numValues, |
| const char* incomingMask) { |
| TimestampVectorBatch* tsBatch = |
| dynamic_cast<TimestampVectorBatch*>(&rowBatch); |
| if (tsBatch == nullptr) { |
| throw InvalidArgument("Failed to cast to TimestampVectorBatch"); |
| } |
| |
| TimestampColumnStatisticsImpl* tsStats = |
| dynamic_cast<TimestampColumnStatisticsImpl*>(colIndexStatistics.get()); |
| if (tsStats == nullptr) { |
| throw InvalidArgument("Failed to cast to TimestampColumnStatisticsImpl"); |
| } |
| |
| ColumnWriter::add(rowBatch, offset, numValues, incomingMask); |
| |
| const char* notNull = tsBatch->hasNulls ? |
| tsBatch->notNull.data() + offset : nullptr; |
| int64_t *secs = tsBatch->data.data() + offset; |
| int64_t *nanos = tsBatch->nanoseconds.data() + offset; |
| |
| uint64_t count = 0; |
| for (uint64_t i = 0; i < numValues; ++i) { |
| if (notNull == nullptr || notNull[i]) { |
| // TimestampVectorBatch already stores data in UTC |
| int64_t millsUTC = secs[i] * 1000 + nanos[i] / 1000000; |
| ++count; |
| if (enableBloomFilter) { |
| bloomFilter->addLong(millsUTC); |
| } |
| tsStats->update(millsUTC); |
| |
| if (secs[i] < 0 && nanos[i] != 0) { |
| secs[i] += 1; |
| } |
| |
| secs[i] -= timezone.getEpoch(); |
| nanos[i] = formatNano(nanos[i]); |
| } |
| } |
| tsStats->increase(count); |
| if (count < numValues) { |
| tsStats->setHasNull(true); |
| } |
| |
| secRleEncoder->add(secs, numValues, notNull); |
| nanoRleEncoder->add(nanos, numValues, notNull); |
| } |
| |
| void TimestampColumnWriter::flush(std::vector<proto::Stream>& streams) { |
| ColumnWriter::flush(streams); |
| |
| proto::Stream dataStream; |
| dataStream.set_kind(proto::Stream_Kind_DATA); |
| dataStream.set_column(static_cast<uint32_t>(columnId)); |
| dataStream.set_length(secRleEncoder->flush()); |
| streams.push_back(dataStream); |
| |
| proto::Stream secondaryStream; |
| secondaryStream.set_kind(proto::Stream_Kind_SECONDARY); |
| secondaryStream.set_column(static_cast<uint32_t>(columnId)); |
| secondaryStream.set_length(nanoRleEncoder->flush()); |
| streams.push_back(secondaryStream); |
| } |
| |
| uint64_t TimestampColumnWriter::getEstimatedSize() const { |
| uint64_t size = ColumnWriter::getEstimatedSize(); |
| size += secRleEncoder->getBufferSize(); |
| size += nanoRleEncoder->getBufferSize(); |
| return size; |
| } |
| |
| void TimestampColumnWriter::getColumnEncoding( |
| std::vector<proto::ColumnEncoding>& encodings) const { |
| proto::ColumnEncoding encoding; |
| encoding.set_kind(RleVersionMapper(rleVersion)); |
| encoding.set_dictionarysize(0); |
| if (enableBloomFilter) { |
| encoding.set_bloomencoding(BloomFilterVersion::UTF8); |
| } |
| encodings.push_back(encoding); |
| } |
| |
| void TimestampColumnWriter::recordPosition() const { |
| ColumnWriter::recordPosition(); |
| secRleEncoder->recordPosition(rowIndexPosition.get()); |
| nanoRleEncoder->recordPosition(rowIndexPosition.get()); |
| } |
| |
| class DateColumnWriter : public IntegerColumnWriter { |
| public: |
| DateColumnWriter(const Type& type, |
| const StreamsFactory& factory, |
| const WriterOptions& options); |
| |
| virtual void add(ColumnVectorBatch& rowBatch, |
| uint64_t offset, |
| uint64_t numValues, |
| const char* incomingMask) override; |
| }; |
| |
| DateColumnWriter::DateColumnWriter( |
| const Type &type, |
| const StreamsFactory &factory, |
| const WriterOptions &options) : |
| IntegerColumnWriter(type, factory, options) { |
| // PASS |
| } |
| |
| void DateColumnWriter::add(ColumnVectorBatch& rowBatch, |
| uint64_t offset, |
| uint64_t numValues, |
| const char* incomingMask) { |
| const LongVectorBatch* longBatch = |
| dynamic_cast<const LongVectorBatch*>(&rowBatch); |
| if (longBatch == nullptr) { |
| throw InvalidArgument("Failed to cast to LongVectorBatch"); |
| } |
| |
| DateColumnStatisticsImpl* dateStats = |
| dynamic_cast<DateColumnStatisticsImpl*>(colIndexStatistics.get()); |
| if (dateStats == nullptr) { |
| throw InvalidArgument("Failed to cast to DateColumnStatisticsImpl"); |
| } |
| |
| ColumnWriter::add(rowBatch, offset, numValues, incomingMask); |
| |
| const int64_t* data = longBatch->data.data() + offset; |
| const char* notNull = longBatch->hasNulls ? |
| longBatch->notNull.data() + offset : nullptr; |
| |
| rleEncoder->add(data, numValues, notNull); |
| |
| uint64_t count = 0; |
| for (uint64_t i = 0; i < numValues; ++i) { |
| if (!notNull || notNull[i]) { |
| ++count; |
| dateStats->update(static_cast<int32_t>(data[i])); |
| if (enableBloomFilter) { |
| bloomFilter->addLong(data[i]); |
| } |
| } |
| } |
| dateStats->increase(count); |
| if (count < numValues) { |
| dateStats->setHasNull(true); |
| } |
| } |
| |
| class Decimal64ColumnWriter : public ColumnWriter { |
| public: |
| static const uint32_t MAX_PRECISION_64 = 18; |
| static const uint32_t MAX_PRECISION_128 = 38; |
| |
| Decimal64ColumnWriter(const Type& type, |
| const StreamsFactory& factory, |
| const WriterOptions& options); |
| |
| virtual void add(ColumnVectorBatch& rowBatch, |
| uint64_t offset, |
| uint64_t numValues, |
| const char* incomingMask) override; |
| |
| virtual void flush(std::vector<proto::Stream>& streams) override; |
| |
| virtual uint64_t getEstimatedSize() const override; |
| |
| virtual void getColumnEncoding( |
| std::vector<proto::ColumnEncoding>& encodings) const override; |
| |
| virtual void recordPosition() const override; |
| |
| protected: |
| RleVersion rleVersion; |
| uint64_t precision; |
| uint64_t scale; |
| std::unique_ptr<AppendOnlyBufferedStream> valueStream; |
| std::unique_ptr<RleEncoder> scaleEncoder; |
| |
| private: |
| char buffer[10]; |
| }; |
| |
| Decimal64ColumnWriter::Decimal64ColumnWriter( |
| const Type& type, |
| const StreamsFactory& factory, |
| const WriterOptions& options) : |
| ColumnWriter(type, factory, options), |
| rleVersion(options.getRleVersion()), |
| precision(type.getPrecision()), |
| scale(type.getScale()) { |
| valueStream.reset(new AppendOnlyBufferedStream( |
| factory.createStream(proto::Stream_Kind_DATA))); |
| std::unique_ptr<BufferedOutputStream> scaleStream = |
| factory.createStream(proto::Stream_Kind_SECONDARY); |
| scaleEncoder = createRleEncoder(std::move(scaleStream), |
| true, |
| rleVersion, |
| memPool, |
| options.getAlignedBitpacking()); |
| |
| if (enableIndex) { |
| recordPosition(); |
| } |
| } |
| |
| void Decimal64ColumnWriter::add(ColumnVectorBatch& rowBatch, |
| uint64_t offset, |
| uint64_t numValues, |
| const char* incomingMask) { |
| const Decimal64VectorBatch* decBatch = |
| dynamic_cast<const Decimal64VectorBatch*>(&rowBatch); |
| if (decBatch == nullptr) { |
| throw InvalidArgument("Failed to cast to Decimal64VectorBatch"); |
| } |
| |
| DecimalColumnStatisticsImpl* decStats = |
| dynamic_cast<DecimalColumnStatisticsImpl*>(colIndexStatistics.get()); |
| if (decStats == nullptr) { |
| throw InvalidArgument("Failed to cast to DecimalColumnStatisticsImpl"); |
| } |
| |
| ColumnWriter::add(rowBatch, offset, numValues, incomingMask); |
| |
| const char* notNull = decBatch->hasNulls ? |
| decBatch->notNull.data() + offset : nullptr; |
| const int64_t* values = decBatch->values.data() + offset; |
| |
| uint64_t count = 0; |
| for (uint64_t i = 0; i < numValues; ++i) { |
| if (!notNull || notNull[i]) { |
| int64_t val = zigZag(values[i]); |
| char* data = buffer; |
| while (true) { |
| if ((val & ~0x7f) == 0) { |
| *(data++) = (static_cast<char>(val)); |
| break; |
| } else { |
| *(data++) = static_cast<char>(0x80 | (val & 0x7f)); |
| // cast val to unsigned so as to force 0-fill right shift |
| val = (static_cast<uint64_t>(val) >> 7); |
| } |
| } |
| valueStream->write(buffer, static_cast<size_t>(data - buffer)); |
| ++count; |
| if (enableBloomFilter) { |
| std::string decimal = Decimal( |
| values[i], static_cast<int32_t>(scale)).toString(true); |
| bloomFilter->addBytes( |
| decimal.c_str(), static_cast<int64_t>(decimal.size())); |
| } |
| decStats->update(Decimal(values[i], static_cast<int32_t>(scale))); |
| } |
| } |
| decStats->increase(count); |
| if (count < numValues) { |
| decStats->setHasNull(true); |
| } |
| std::vector<int64_t> scales(numValues, static_cast<int64_t>(scale)); |
| scaleEncoder->add(scales.data(), numValues, notNull); |
| } |
| |
| void Decimal64ColumnWriter::flush(std::vector<proto::Stream>& streams) { |
| ColumnWriter::flush(streams); |
| |
| proto::Stream dataStream; |
| dataStream.set_kind(proto::Stream_Kind_DATA); |
| dataStream.set_column(static_cast<uint32_t>(columnId)); |
| dataStream.set_length(valueStream->flush()); |
| streams.push_back(dataStream); |
| |
| proto::Stream secondaryStream; |
| secondaryStream.set_kind(proto::Stream_Kind_SECONDARY); |
| secondaryStream.set_column(static_cast<uint32_t>(columnId)); |
| secondaryStream.set_length(scaleEncoder->flush()); |
| streams.push_back(secondaryStream); |
| } |
| |
| uint64_t Decimal64ColumnWriter::getEstimatedSize() const { |
| uint64_t size = ColumnWriter::getEstimatedSize(); |
| size += valueStream->getSize(); |
| size += scaleEncoder->getBufferSize(); |
| return size; |
| } |
| |
| void Decimal64ColumnWriter::getColumnEncoding( |
| std::vector<proto::ColumnEncoding>& encodings) const { |
| proto::ColumnEncoding encoding; |
| encoding.set_kind(RleVersionMapper(rleVersion)); |
| encoding.set_dictionarysize(0); |
| if (enableBloomFilter) { |
| encoding.set_bloomencoding(BloomFilterVersion::UTF8); |
| } |
| encodings.push_back(encoding); |
| } |
| |
| void Decimal64ColumnWriter::recordPosition() const { |
| ColumnWriter::recordPosition(); |
| valueStream->recordPosition(rowIndexPosition.get()); |
| scaleEncoder->recordPosition(rowIndexPosition.get()); |
| } |
| |
| class Decimal128ColumnWriter : public Decimal64ColumnWriter { |
| public: |
| Decimal128ColumnWriter(const Type& type, |
| const StreamsFactory& factory, |
| const WriterOptions& options); |
| |
| virtual void add(ColumnVectorBatch& rowBatch, |
| uint64_t offset, |
| uint64_t numValues, |
| const char* incomingMask) override; |
| |
| private: |
| char buffer[20]; |
| }; |
| |
| Decimal128ColumnWriter::Decimal128ColumnWriter( |
| const Type& type, |
| const StreamsFactory& factory, |
| const WriterOptions& options) : |
| Decimal64ColumnWriter(type, factory, options) { |
| // PASS |
| } |
| |
| // Zigzag encoding moves the sign bit to the least significant bit using the |
| // expression (val « 1) ^ (val » 63) and derives its name from the fact that |
| // positive and negative numbers alternate once encoded. |
| Int128 zigZagInt128(const Int128& value) { |
| bool isNegative = value < 0; |
| Int128 val = value.abs(); |
| val <<= 1; |
| if (isNegative) { |
| val -= 1; |
| } |
| return val; |
| } |
| |
| void Decimal128ColumnWriter::add(ColumnVectorBatch& rowBatch, |
| uint64_t offset, |
| uint64_t numValues, |
| const char* incomingMask) { |
| const Decimal128VectorBatch* decBatch = |
| dynamic_cast<const Decimal128VectorBatch*>(&rowBatch); |
| if (decBatch == nullptr) { |
| throw InvalidArgument("Failed to cast to Decimal128VectorBatch"); |
| } |
| |
| DecimalColumnStatisticsImpl* decStats = |
| dynamic_cast<DecimalColumnStatisticsImpl*>(colIndexStatistics.get()); |
| if (decStats == nullptr) { |
| throw InvalidArgument("Failed to cast to DecimalColumnStatisticsImpl"); |
| } |
| |
| ColumnWriter::add(rowBatch, offset, numValues, incomingMask); |
| |
| const char* notNull = decBatch->hasNulls ? |
| decBatch->notNull.data() + offset : nullptr; |
| const Int128* values = decBatch->values.data() + offset; |
| |
| // The current encoding of decimal columns stores the integer representation |
| // of the value as an unbounded length zigzag encoded base 128 varint. |
| uint64_t count = 0; |
| for (uint64_t i = 0; i < numValues; ++i) { |
| if (!notNull || notNull[i]) { |
| Int128 val = zigZagInt128(values[i]); |
| char* data = buffer; |
| while (true) { |
| if ((val & ~0x7f) == 0) { |
| *(data++) = (static_cast<char>(val.getLowBits())); |
| break; |
| } else { |
| *(data++) = static_cast<char>(0x80 | (val.getLowBits() & 0x7f)); |
| val >>= 7; |
| } |
| } |
| valueStream->write(buffer, static_cast<size_t>(data - buffer)); |
| |
| ++count; |
| if (enableBloomFilter) { |
| std::string decimal = Decimal( |
| values[i], static_cast<int32_t>(scale)).toString(true); |
| bloomFilter->addBytes( |
| decimal.c_str(), static_cast<int64_t>(decimal.size())); |
| } |
| decStats->update(Decimal(values[i], static_cast<int32_t>(scale))); |
| } |
| } |
| decStats->increase(count); |
| if (count < numValues) { |
| decStats->setHasNull(true); |
| } |
| std::vector<int64_t> scales(numValues, static_cast<int64_t>(scale)); |
| scaleEncoder->add(scales.data(), numValues, notNull); |
| } |
| |
| class ListColumnWriter : public ColumnWriter { |
| public: |
| ListColumnWriter(const Type& type, |
| const StreamsFactory& factory, |
| const WriterOptions& options); |
| ~ListColumnWriter() override; |
| |
| virtual void add(ColumnVectorBatch& rowBatch, |
| uint64_t offset, |
| uint64_t numValues, |
| const char* incomingMask) override; |
| |
| virtual void flush(std::vector<proto::Stream>& streams) override; |
| |
| virtual uint64_t getEstimatedSize() const override; |
| |
| virtual void getColumnEncoding( |
| std::vector<proto::ColumnEncoding>& encodings) const override; |
| |
| virtual void getStripeStatistics( |
| std::vector<proto::ColumnStatistics>& stats) const override; |
| |
| virtual void getFileStatistics( |
| std::vector<proto::ColumnStatistics>& stats) const override; |
| |
| virtual void mergeStripeStatsIntoFileStats() override; |
| |
| virtual void mergeRowGroupStatsIntoStripeStats() override; |
| |
| virtual void createRowIndexEntry() override; |
| |
| virtual void writeIndex( |
| std::vector<proto::Stream> &streams) const override; |
| |
| virtual void recordPosition() const override; |
| |
| virtual void writeDictionary() override; |
| |
| virtual void reset() override; |
| |
| private: |
| std::unique_ptr<RleEncoder> lengthEncoder; |
| RleVersion rleVersion; |
| std::unique_ptr<ColumnWriter> child; |
| }; |
| |
| ListColumnWriter::ListColumnWriter(const Type& type, |
| const StreamsFactory& factory, |
| const WriterOptions& options) : |
| ColumnWriter(type, factory, options), |
| rleVersion(options.getRleVersion()){ |
| |
| std::unique_ptr<BufferedOutputStream> lengthStream = |
| factory.createStream(proto::Stream_Kind_LENGTH); |
| lengthEncoder = createRleEncoder(std::move(lengthStream), |
| false, |
| rleVersion, |
| memPool, |
| options.getAlignedBitpacking()); |
| |
| if (type.getSubtypeCount() == 1) { |
| child = buildWriter(*type.getSubtype(0), factory, options); |
| } |
| |
| if (enableIndex) { |
| recordPosition(); |
| } |
| } |
| |
| ListColumnWriter::~ListColumnWriter() { |
| // PASS |
| } |
| |
| void ListColumnWriter::add(ColumnVectorBatch& rowBatch, |
| uint64_t offset, |
| uint64_t numValues, |
| const char* incomingMask) { |
| ListVectorBatch* listBatch = dynamic_cast<ListVectorBatch*>(&rowBatch); |
| if (listBatch == nullptr) { |
| throw InvalidArgument("Failed to cast to ListVectorBatch"); |
| } |
| |
| ColumnWriter::add(rowBatch, offset, numValues, incomingMask); |
| |
| int64_t* offsets = listBatch->offsets.data() + offset; |
| const char* notNull = listBatch->hasNulls ? |
| listBatch->notNull.data() + offset : nullptr; |
| |
| uint64_t elemOffset = static_cast<uint64_t>(offsets[0]); |
| uint64_t totalNumValues = static_cast<uint64_t>(offsets[numValues] - offsets[0]); |
| |
| // translate offsets to lengths |
| for (uint64_t i = 0; i != numValues; ++i) { |
| offsets[i] = offsets[i + 1] - offsets[i]; |
| } |
| |
| // unnecessary to deal with null as elements are packed together |
| if (child.get()) { |
| child->add(*listBatch->elements, elemOffset, totalNumValues, nullptr); |
| } |
| lengthEncoder->add(offsets, numValues, notNull); |
| |
| if (enableIndex) { |
| if (!notNull) { |
| colIndexStatistics->increase(numValues); |
| } else { |
| uint64_t count = 0; |
| for (uint64_t i = 0; i < numValues; ++i) { |
| if (notNull[i]) { |
| ++count; |
| if (enableBloomFilter) { |
| bloomFilter->addLong(offsets[i]); |
| } |
| } |
| } |
| colIndexStatistics->increase(count); |
| if (count < numValues) { |
| colIndexStatistics->setHasNull(true); |
| } |
| } |
| } |
| } |
| |
| void ListColumnWriter::flush(std::vector<proto::Stream>& streams) { |
| ColumnWriter::flush(streams); |
| |
| proto::Stream stream; |
| stream.set_kind(proto::Stream_Kind_LENGTH); |
| stream.set_column(static_cast<uint32_t>(columnId)); |
| stream.set_length(lengthEncoder->flush()); |
| streams.push_back(stream); |
| |
| if (child.get()) { |
| child->flush(streams); |
| } |
| } |
| |
| void ListColumnWriter::writeIndex(std::vector<proto::Stream> &streams) const { |
| ColumnWriter::writeIndex(streams); |
| if (child.get()) { |
| child->writeIndex(streams); |
| } |
| } |
| |
| uint64_t ListColumnWriter::getEstimatedSize() const { |
| uint64_t size = ColumnWriter::getEstimatedSize(); |
| if (child.get()) { |
| size += lengthEncoder->getBufferSize(); |
| size += child->getEstimatedSize(); |
| } |
| return size; |
| } |
| |
| void ListColumnWriter::getColumnEncoding( |
| std::vector<proto::ColumnEncoding>& encodings) const { |
| proto::ColumnEncoding encoding; |
| encoding.set_kind(RleVersionMapper(rleVersion)); |
| encoding.set_dictionarysize(0); |
| if (enableBloomFilter) { |
| encoding.set_bloomencoding(BloomFilterVersion::UTF8); |
| } |
| encodings.push_back(encoding); |
| if (child.get()) { |
| child->getColumnEncoding(encodings); |
| } |
| } |
| |
| void ListColumnWriter::getStripeStatistics( |
| std::vector<proto::ColumnStatistics>& stats) const { |
| ColumnWriter::getStripeStatistics(stats); |
| if (child.get()) { |
| child->getStripeStatistics(stats); |
| } |
| } |
| |
| void ListColumnWriter::mergeStripeStatsIntoFileStats() { |
| ColumnWriter::mergeStripeStatsIntoFileStats(); |
| if (child.get()) { |
| child->mergeStripeStatsIntoFileStats(); |
| } |
| } |
| |
| void ListColumnWriter::getFileStatistics( |
| std::vector<proto::ColumnStatistics>& stats) const { |
| ColumnWriter::getFileStatistics(stats); |
| if (child.get()) { |
| child->getFileStatistics(stats); |
| } |
| } |
| |
| void ListColumnWriter::mergeRowGroupStatsIntoStripeStats() { |
| ColumnWriter::mergeRowGroupStatsIntoStripeStats(); |
| if (child.get()) { |
| child->mergeRowGroupStatsIntoStripeStats(); |
| } |
| } |
| |
| void ListColumnWriter::createRowIndexEntry() { |
| ColumnWriter::createRowIndexEntry(); |
| if (child.get()) { |
| child->createRowIndexEntry(); |
| } |
| } |
| |
| void ListColumnWriter::recordPosition() const { |
| ColumnWriter::recordPosition(); |
| lengthEncoder->recordPosition(rowIndexPosition.get()); |
| } |
| |
| void ListColumnWriter::reset() { |
| ColumnWriter::reset(); |
| if (child) { |
| child->reset(); |
| } |
| } |
| |
| void ListColumnWriter::writeDictionary() { |
| if (child) { |
| child->writeDictionary(); |
| } |
| } |
| |
| class MapColumnWriter : public ColumnWriter { |
| public: |
| MapColumnWriter(const Type& type, |
| const StreamsFactory& factory, |
| const WriterOptions& options); |
| ~MapColumnWriter() override; |
| |
| virtual void add(ColumnVectorBatch& rowBatch, |
| uint64_t offset, |
| uint64_t numValues, |
| const char* incomingMask) override; |
| |
| virtual void flush(std::vector<proto::Stream>& streams) override; |
| |
| virtual uint64_t getEstimatedSize() const override; |
| |
| virtual void getColumnEncoding( |
| std::vector<proto::ColumnEncoding>& encodings) const override; |
| |
| virtual void getStripeStatistics( |
| std::vector<proto::ColumnStatistics>& stats) const override; |
| |
| virtual void getFileStatistics( |
| std::vector<proto::ColumnStatistics>& stats) const override; |
| |
| virtual void mergeStripeStatsIntoFileStats() override; |
| |
| virtual void mergeRowGroupStatsIntoStripeStats() override; |
| |
| virtual void createRowIndexEntry() override; |
| |
| virtual void writeIndex( |
| std::vector<proto::Stream> &streams) const override; |
| |
| virtual void recordPosition() const override; |
| |
| virtual void writeDictionary() override; |
| |
| virtual void reset() override; |
| |
| private: |
| std::unique_ptr<ColumnWriter> keyWriter; |
| std::unique_ptr<ColumnWriter> elemWriter; |
| std::unique_ptr<RleEncoder> lengthEncoder; |
| RleVersion rleVersion; |
| }; |
| |
| MapColumnWriter::MapColumnWriter(const Type& type, |
| const StreamsFactory& factory, |
| const WriterOptions& options) : |
| ColumnWriter(type, factory, options), |
| rleVersion(options.getRleVersion()){ |
| std::unique_ptr<BufferedOutputStream> lengthStream = |
| factory.createStream(proto::Stream_Kind_LENGTH); |
| lengthEncoder = createRleEncoder(std::move(lengthStream), |
| false, |
| rleVersion, |
| memPool, |
| options.getAlignedBitpacking()); |
| |
| if (type.getSubtypeCount() > 0) { |
| keyWriter = buildWriter(*type.getSubtype(0), factory, options); |
| } |
| |
| if (type.getSubtypeCount() > 1) { |
| elemWriter = buildWriter(*type.getSubtype(1), factory, options); |
| } |
| |
| if (enableIndex) { |
| recordPosition(); |
| } |
| } |
| |
| MapColumnWriter::~MapColumnWriter() { |
| // PASS |
| } |
| |
| void MapColumnWriter::add(ColumnVectorBatch& rowBatch, |
| uint64_t offset, |
| uint64_t numValues, |
| const char* incomingMask) { |
| MapVectorBatch* mapBatch = dynamic_cast<MapVectorBatch*>(&rowBatch); |
| if (mapBatch == nullptr) { |
| throw InvalidArgument("Failed to cast to MapVectorBatch"); |
| } |
| |
| ColumnWriter::add(rowBatch, offset, numValues, incomingMask); |
| |
| int64_t* offsets = mapBatch->offsets.data() + offset; |
| const char* notNull = mapBatch->hasNulls ? |
| mapBatch->notNull.data() + offset : nullptr; |
| |
| uint64_t elemOffset = static_cast<uint64_t>(offsets[0]); |
| uint64_t totalNumValues = static_cast<uint64_t>(offsets[numValues] - offsets[0]); |
| |
| // translate offsets to lengths |
| for (uint64_t i = 0; i != numValues; ++i) { |
| offsets[i] = offsets[i + 1] - offsets[i]; |
| } |
| |
| lengthEncoder->add(offsets, numValues, notNull); |
| |
| // unnecessary to deal with null as keys and values are packed together |
| if (keyWriter.get()) { |
| keyWriter->add(*mapBatch->keys, elemOffset, totalNumValues, nullptr); |
| } |
| if (elemWriter.get()) { |
| elemWriter->add(*mapBatch->elements, elemOffset, totalNumValues, nullptr); |
| } |
| |
| if (enableIndex) { |
| if (!notNull) { |
| colIndexStatistics->increase(numValues); |
| } else { |
| uint64_t count = 0; |
| for (uint64_t i = 0; i < numValues; ++i) { |
| if (notNull[i]) { |
| ++count; |
| if (enableBloomFilter) { |
| bloomFilter->addLong(offsets[i]); |
| } |
| } |
| } |
| colIndexStatistics->increase(count); |
| if (count < numValues) { |
| colIndexStatistics->setHasNull(true); |
| } |
| } |
| } |
| } |
| |
| void MapColumnWriter::flush(std::vector<proto::Stream>& streams) { |
| ColumnWriter::flush(streams); |
| |
| proto::Stream stream; |
| stream.set_kind(proto::Stream_Kind_LENGTH); |
| stream.set_column(static_cast<uint32_t>(columnId)); |
| stream.set_length(lengthEncoder->flush()); |
| streams.push_back(stream); |
| |
| if (keyWriter.get()) { |
| keyWriter->flush(streams); |
| } |
| if (elemWriter.get()) { |
| elemWriter->flush(streams); |
| } |
| } |
| |
| void MapColumnWriter::writeIndex( |
| std::vector<proto::Stream> &streams) const { |
| ColumnWriter::writeIndex(streams); |
| if (keyWriter.get()) { |
| keyWriter->writeIndex(streams); |
| } |
| if (elemWriter.get()) { |
| elemWriter->writeIndex(streams); |
| } |
| } |
| |
| uint64_t MapColumnWriter::getEstimatedSize() const { |
| uint64_t size = ColumnWriter::getEstimatedSize(); |
| size += lengthEncoder->getBufferSize(); |
| if (keyWriter.get()) { |
| size += keyWriter->getEstimatedSize(); |
| } |
| if (elemWriter.get()) { |
| size += elemWriter->getEstimatedSize(); |
| } |
| return size; |
| } |
| |
| void MapColumnWriter::getColumnEncoding( |
| std::vector<proto::ColumnEncoding>& encodings) const { |
| proto::ColumnEncoding encoding; |
| encoding.set_kind(RleVersionMapper(rleVersion)); |
| encoding.set_dictionarysize(0); |
| if (enableBloomFilter) { |
| encoding.set_bloomencoding(BloomFilterVersion::UTF8); |
| } |
| encodings.push_back(encoding); |
| if (keyWriter.get()) { |
| keyWriter->getColumnEncoding(encodings); |
| } |
| if (elemWriter.get()) { |
| elemWriter->getColumnEncoding(encodings); |
| } |
| } |
| |
| void MapColumnWriter::getStripeStatistics( |
| std::vector<proto::ColumnStatistics>& stats) const { |
| ColumnWriter::getStripeStatistics(stats); |
| if (keyWriter.get()) { |
| keyWriter->getStripeStatistics(stats); |
| } |
| if (elemWriter.get()) { |
| elemWriter->getStripeStatistics(stats); |
| } |
| } |
| |
| void MapColumnWriter::mergeStripeStatsIntoFileStats() { |
| ColumnWriter::mergeStripeStatsIntoFileStats(); |
| if (keyWriter.get()) { |
| keyWriter->mergeStripeStatsIntoFileStats(); |
| } |
| if (elemWriter.get()) { |
| elemWriter->mergeStripeStatsIntoFileStats(); |
| } |
| } |
| |
| void MapColumnWriter::getFileStatistics( |
| std::vector<proto::ColumnStatistics>& stats) const { |
| ColumnWriter::getFileStatistics(stats); |
| if (keyWriter.get()) { |
| keyWriter->getFileStatistics(stats); |
| } |
| if (elemWriter.get()) { |
| elemWriter->getFileStatistics(stats); |
| } |
| } |
| |
| void MapColumnWriter::mergeRowGroupStatsIntoStripeStats() { |
| ColumnWriter::mergeRowGroupStatsIntoStripeStats(); |
| if (keyWriter.get()) { |
| keyWriter->mergeRowGroupStatsIntoStripeStats(); |
| } |
| if (elemWriter.get()) { |
| elemWriter->mergeRowGroupStatsIntoStripeStats(); |
| } |
| } |
| |
| void MapColumnWriter::createRowIndexEntry() { |
| ColumnWriter::createRowIndexEntry(); |
| if (keyWriter.get()) { |
| keyWriter->createRowIndexEntry(); |
| } |
| if (elemWriter.get()) { |
| elemWriter->createRowIndexEntry(); |
| } |
| } |
| |
| void MapColumnWriter::recordPosition() const { |
| ColumnWriter::recordPosition(); |
| lengthEncoder->recordPosition(rowIndexPosition.get()); |
| } |
| |
| void MapColumnWriter::reset() { |
| ColumnWriter::reset(); |
| if (keyWriter) { |
| keyWriter->reset(); |
| } |
| if (elemWriter) { |
| elemWriter->reset(); |
| } |
| } |
| |
| void MapColumnWriter::writeDictionary() { |
| if (keyWriter) { |
| keyWriter->writeDictionary(); |
| } |
| if (elemWriter) { |
| elemWriter->writeDictionary(); |
| } |
| } |
| |
| class UnionColumnWriter : public ColumnWriter { |
| public: |
| UnionColumnWriter(const Type& type, |
| const StreamsFactory& factory, |
| const WriterOptions& options); |
| |
| virtual void add(ColumnVectorBatch& rowBatch, |
| uint64_t offset, |
| uint64_t numValues, |
| const char* incomingMask) override; |
| |
| virtual void flush(std::vector<proto::Stream>& streams) override; |
| |
| virtual uint64_t getEstimatedSize() const override; |
| |
| virtual void getColumnEncoding( |
| std::vector<proto::ColumnEncoding>& encodings) const override; |
| |
| virtual void getStripeStatistics( |
| std::vector<proto::ColumnStatistics>& stats) const override; |
| |
| virtual void getFileStatistics( |
| std::vector<proto::ColumnStatistics>& stats) const override; |
| |
| virtual void mergeStripeStatsIntoFileStats() override; |
| |
| virtual void mergeRowGroupStatsIntoStripeStats() override; |
| |
| virtual void createRowIndexEntry() override; |
| |
| virtual void writeIndex( |
| std::vector<proto::Stream> &streams) const override; |
| |
| virtual void recordPosition() const override; |
| |
| virtual void writeDictionary() override; |
| |
| virtual void reset() override; |
| |
| private: |
| std::unique_ptr<ByteRleEncoder> rleEncoder; |
| std::vector<std::unique_ptr<ColumnWriter>> children; |
| }; |
| |
| UnionColumnWriter::UnionColumnWriter(const Type& type, |
| const StreamsFactory& factory, |
| const WriterOptions& options) : |
| ColumnWriter(type, factory, options) { |
| |
| std::unique_ptr<BufferedOutputStream> dataStream = |
| factory.createStream(proto::Stream_Kind_DATA); |
| rleEncoder = createByteRleEncoder(std::move(dataStream)); |
| |
| for (uint64_t i = 0; i != type.getSubtypeCount(); ++i) { |
| children.push_back(buildWriter(*type.getSubtype(i), |
| factory, |
| options)); |
| } |
| |
| if (enableIndex) { |
| recordPosition(); |
| } |
| } |
| |
| void UnionColumnWriter::add(ColumnVectorBatch& rowBatch, |
| uint64_t offset, |
| uint64_t numValues, |
| const char* incomingMask) { |
| UnionVectorBatch* unionBatch = dynamic_cast<UnionVectorBatch*>(&rowBatch); |
| if (unionBatch == nullptr) { |
| throw InvalidArgument("Failed to cast to UnionVectorBatch"); |
| } |
| |
| ColumnWriter::add(rowBatch, offset, numValues, incomingMask); |
| |
| const char* notNull = unionBatch->hasNulls ? |
| unionBatch->notNull.data() + offset : nullptr; |
| unsigned char * tags = unionBatch->tags.data() + offset; |
| uint64_t * offsets = unionBatch->offsets.data() + offset; |
| |
| std::vector<int64_t> childOffset(children.size(), -1); |
| std::vector<uint64_t> childLength(children.size(), 0); |
| |
| for (uint64_t i = 0; i != numValues; ++i) { |
| if (childOffset[tags[i]] == -1) { |
| childOffset[tags[i]] = static_cast<int64_t>(offsets[i]); |
| } |
| ++childLength[tags[i]]; |
| } |
| |
| rleEncoder->add(reinterpret_cast<char*>(tags), numValues, notNull); |
| |
| for (uint32_t i = 0; i < children.size(); ++i) { |
| if (childLength[i] > 0) { |
| children[i]->add(*unionBatch->children[i], |
| static_cast<uint64_t>(childOffset[i]), |
| childLength[i], nullptr); |
| } |
| } |
| |
| // update stats |
| if (enableIndex) { |
| if (!notNull) { |
| colIndexStatistics->increase(numValues); |
| } else { |
| uint64_t count = 0; |
| for (uint64_t i = 0; i < numValues; ++i) { |
| if (notNull[i]) { |
| ++count; |
| if (enableBloomFilter) { |
| bloomFilter->addLong(tags[i]); |
| } |
| } |
| } |
| colIndexStatistics->increase(count); |
| if (count < numValues) { |
| colIndexStatistics->setHasNull(true); |
| } |
| } |
| } |
| } |
| |
| void UnionColumnWriter::flush(std::vector<proto::Stream>& streams) { |
| ColumnWriter::flush(streams); |
| |
| proto::Stream stream; |
| stream.set_kind(proto::Stream_Kind_DATA); |
| stream.set_column(static_cast<uint32_t>(columnId)); |
| stream.set_length(rleEncoder->flush()); |
| streams.push_back(stream); |
| |
| for (uint32_t i = 0; i < children.size(); ++i) { |
| children[i]->flush(streams); |
| } |
| } |
| |
| void UnionColumnWriter::writeIndex(std::vector<proto::Stream> &streams) const { |
| ColumnWriter::writeIndex(streams); |
| for (uint32_t i = 0; i < children.size(); ++i) { |
| children[i]->writeIndex(streams); |
| } |
| } |
| |
| uint64_t UnionColumnWriter::getEstimatedSize() const { |
| uint64_t size = ColumnWriter::getEstimatedSize(); |
| size += rleEncoder->getBufferSize(); |
| for (uint32_t i = 0; i < children.size(); ++i) { |
| size += children[i]->getEstimatedSize(); |
| } |
| return size; |
| } |
| |
| void UnionColumnWriter::getColumnEncoding( |
| std::vector<proto::ColumnEncoding>& encodings) const { |
| proto::ColumnEncoding encoding; |
| encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); |
| encoding.set_dictionarysize(0); |
| if (enableBloomFilter) { |
| encoding.set_bloomencoding(BloomFilterVersion::UTF8); |
| } |
| encodings.push_back(encoding); |
| for (uint32_t i = 0; i < children.size(); ++i) { |
| children[i]->getColumnEncoding(encodings); |
| } |
| } |
| |
| void UnionColumnWriter::getStripeStatistics( |
| std::vector<proto::ColumnStatistics>& stats) const { |
| ColumnWriter::getStripeStatistics(stats); |
| for (uint32_t i = 0; i < children.size(); ++i) { |
| children[i]->getStripeStatistics(stats); |
| } |
| } |
| |
| void UnionColumnWriter::mergeStripeStatsIntoFileStats() { |
| ColumnWriter::mergeStripeStatsIntoFileStats(); |
| for (uint32_t i = 0; i < children.size(); ++i) { |
| children[i]->mergeStripeStatsIntoFileStats(); |
| } |
| } |
| |
| void UnionColumnWriter::getFileStatistics( |
| std::vector<proto::ColumnStatistics>& stats) const { |
| ColumnWriter::getFileStatistics(stats); |
| for (uint32_t i = 0; i < children.size(); ++i) { |
| children[i]->getFileStatistics(stats); |
| } |
| } |
| |
| void UnionColumnWriter::mergeRowGroupStatsIntoStripeStats() { |
| ColumnWriter::mergeRowGroupStatsIntoStripeStats(); |
| for (uint32_t i = 0; i < children.size(); ++i) { |
| children[i]->mergeRowGroupStatsIntoStripeStats(); |
| } |
| } |
| |
| void UnionColumnWriter::createRowIndexEntry() { |
| ColumnWriter::createRowIndexEntry(); |
| for (uint32_t i = 0; i < children.size(); ++i) { |
| children[i]->createRowIndexEntry(); |
| } |
| } |
| |
| void UnionColumnWriter::recordPosition() const { |
| ColumnWriter::recordPosition(); |
| rleEncoder->recordPosition(rowIndexPosition.get()); |
| } |
| |
| void UnionColumnWriter::reset() { |
| ColumnWriter::reset(); |
| for (uint32_t i = 0; i < children.size(); ++i) { |
| children[i]->reset(); |
| } |
| } |
| |
| void UnionColumnWriter::writeDictionary() { |
| for (uint32_t i = 0; i < children.size(); ++i) { |
| children[i]->writeDictionary(); |
| } |
| } |
| |
| std::unique_ptr<ColumnWriter> buildWriter( |
| const Type& type, |
| const StreamsFactory& factory, |
| const WriterOptions& options) { |
| switch (static_cast<int64_t>(type.getKind())) { |
| case STRUCT: |
| return std::unique_ptr<ColumnWriter>( |
| new StructColumnWriter( |
| type, |
| factory, |
| options)); |
| case INT: |
| case LONG: |
| case SHORT: |
| return std::unique_ptr<ColumnWriter>( |
| new IntegerColumnWriter( |
| type, |
| factory, |
| options)); |
| case BYTE: |
| return std::unique_ptr<ColumnWriter>( |
| new ByteColumnWriter( |
| type, |
| factory, |
| options)); |
| case BOOLEAN: |
| return std::unique_ptr<ColumnWriter>( |
| new BooleanColumnWriter( |
| type, |
| factory, |
| options)); |
| case DOUBLE: |
| return std::unique_ptr<ColumnWriter>( |
| new DoubleColumnWriter( |
| type, |
| factory, |
| options, |
| false)); |
| case FLOAT: |
| return std::unique_ptr<ColumnWriter>( |
| new DoubleColumnWriter( |
| type, |
| factory, |
| options, |
| true)); |
| case BINARY: |
| return std::unique_ptr<ColumnWriter>( |
| new BinaryColumnWriter( |
| type, |
| factory, |
| options)); |
| case STRING: |
| return std::unique_ptr<ColumnWriter>( |
| new StringColumnWriter( |
| type, |
| factory, |
| options)); |
| case CHAR: |
| return std::unique_ptr<ColumnWriter>( |
| new CharColumnWriter( |
| type, |
| factory, |
| options)); |
| case VARCHAR: |
| return std::unique_ptr<ColumnWriter>( |
| new VarCharColumnWriter( |
| type, |
| factory, |
| options)); |
| case DATE: |
| return std::unique_ptr<ColumnWriter>( |
| new DateColumnWriter( |
| type, |
| factory, |
| options)); |
| case TIMESTAMP: |
| return std::unique_ptr<ColumnWriter>( |
| new TimestampColumnWriter( |
| type, |
| factory, |
| options)); |
| case DECIMAL: |
| if (type.getPrecision() <= Decimal64ColumnWriter::MAX_PRECISION_64) { |
| return std::unique_ptr<ColumnWriter>( |
| new Decimal64ColumnWriter( |
| type, |
| factory, |
| options)); |
| } else if (type.getPrecision() <= Decimal64ColumnWriter::MAX_PRECISION_128) { |
| return std::unique_ptr<ColumnWriter>( |
| new Decimal128ColumnWriter( |
| type, |
| factory, |
| options)); |
| } else { |
| throw NotImplementedYet("Decimal precision more than 38 is not " |
| "supported"); |
| } |
| case LIST: |
| return std::unique_ptr<ColumnWriter>( |
| new ListColumnWriter( |
| type, |
| factory, |
| options)); |
| case MAP: |
| return std::unique_ptr<ColumnWriter>( |
| new MapColumnWriter( |
| type, |
| factory, |
| options)); |
| case UNION: |
| return std::unique_ptr<ColumnWriter>( |
| new UnionColumnWriter( |
| type, |
| factory, |
| options)); |
| default: |
| throw NotImplementedYet("Type is not supported yet for creating " |
| "ColumnWriter."); |
| } |
| } |
| } |