blob: 15089c5ac843b7674d17afc8a9d5311e233073d1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "dbcommon/utils/string-util.h"
#include "storage/format/orc/writer.h"
namespace orc {
StringColumnWriter::StringColumnWriter(const orc::Type *type,
WriterOptions *options)
: ColumnWriter(type, options),
dictionaryKeySizeThreshold(options->getDictKeySizeThreshold()) {
lengthRleCoder =
createRleCoder(false, options->getRleVersion(), ORCTypeKind::LONG,
options->getCompressionKind());
rowsRleCoder =
createRleCoder(false, options->getRleVersion(), ORCTypeKind::LONG,
options->getCompressionKind());
dictDataBufferedStream = createBlockCompressor(options->getCompressionKind());
directDataBufferedStream =
createBlockCompressor(options->getCompressionKind());
rows.reserve(DEFAULT_NUMBER_TUPLES_PER_BATCH);
if (dictionaryKeySizeThreshold == 0) {
useDictionaryEncoding = false;
doneDictionaryCheck = true;
}
}
void StringColumnWriter::writeVector(dbcommon::Vector *vector) {
ColumnWriter::writeVector(vector);
const char *notNull = nullptr;
std::unique_ptr<dbcommon::ByteBuffer> buf;
if (vector->hasNullValue()) {
buf = vector->getNullBuffer()->getReverseBools();
notNull = reinterpret_cast<const char *>(buf->data());
}
uint64_t numValues = vector->getNumOfRows();
const char **vals = vector->getValPtrs();
const uint64_t *lens = vector->getLengths();
for (uint64_t i = 0; i < numValues; i++) {
if (!notNull || notNull[i]) {
if (useDictionaryEncoding) {
rows.push_back(dictionary.add(vals[i], lens[i]));
} else {
directDataBufferedStream->write(vals[i], lens[i]);
}
if (writeStatsOn)
dynamic_cast<StringColumnStatisticsImpl *>(stripeColStats.get())
->updateString(vals[i], lens[i]);
if (createBloomFilter) bloomFilter->addString(vals[i], lens[i]);
}
}
if (!useDictionaryEncoding)
lengthRleCoder->write(const_cast<uint64_t *>(vector->getLengths()),
vector->getNumOfRows(), notNull);
}
void StringColumnWriter::writeStripe(proto::StripeFooter *stripeFooter,
proto::StripeStatistics *pb,
OutputStream *out) {
checkDictionaryEncoding();
if (rows.size() > 0) flushDictionary();
ColumnWriter::writeStripe(stripeFooter, pb, out);
if (useDictionaryEncoding) {
lengthRleCoder->flushToStream(out);
::orc::proto::Stream *lenStream = stripeFooter->add_streams();
lenStream->set_kind(::orc::proto::Stream_Kind::Stream_Kind_LENGTH);
lenStream->set_column(this->getColumnId());
lenStream->set_length(lengthRleCoder->getStreamSize());
dictDataBufferedStream->flushToStream(out);
::orc::proto::Stream *dataStream = stripeFooter->add_streams();
dataStream->set_kind(
::orc::proto::Stream_Kind::Stream_Kind_DICTIONARY_DATA);
dataStream->set_column(this->getColumnId());
dataStream->set_length(dictDataBufferedStream->getStreamSize());
rowsRleCoder->flushToStream(out);
::orc::proto::Stream *rowsStream = stripeFooter->add_streams();
rowsStream->set_kind(::orc::proto::Stream_Kind::Stream_Kind_DATA);
rowsStream->set_column(this->getColumnId());
rowsStream->set_length(rowsRleCoder->getStreamSize());
} else {
lengthRleCoder->flushToStream(out);
::orc::proto::Stream *lenStream = stripeFooter->add_streams();
lenStream->set_kind(::orc::proto::Stream_Kind::Stream_Kind_LENGTH);
lenStream->set_column(this->getColumnId());
lenStream->set_length(lengthRleCoder->getStreamSize());
directDataBufferedStream->flushToStream(out);
::orc::proto::Stream *dataStream = stripeFooter->add_streams();
dataStream->set_kind(::orc::proto::Stream_Kind::Stream_Kind_DATA);
dataStream->set_column(this->getColumnId());
dataStream->set_length(directDataBufferedStream->getStreamSize());
}
::orc::proto::ColumnEncoding *encodingBuilder = stripeFooter->add_columns();
if (useDictionaryEncoding)
encodingBuilder->set_dictionarysize(dictionary.size());
encodingBuilder->set_kind(getProtoColumnEncoding());
// clear the buffers
reset();
}
uint64_t StringColumnWriter::getEstimatedSpaceNeeded() {
if (useDictionaryEncoding)
return 4 * rows.size() + dictionary.sizeInBytes() +
ColumnWriter::getEstimatedSpaceNeeded();
else
return directDataBufferedStream->getEstimatedSpaceNeeded() +
lengthRleCoder->getEstimatedSpaceNeeded() +
ColumnWriter::getEstimatedSpaceNeeded();
}
void StringColumnWriter::addTypeToFooter(proto::Footer *footer) {
proto::Type *typ = footer->add_types();
if (type->getKind() == orc::ORCTypeKind::STRING) {
typ->set_kind(::orc::proto::Type_Kind::Type_Kind_STRING);
} else if (type->getKind() == orc::ORCTypeKind::VARCHAR) {
typ->set_kind(::orc::proto::Type_Kind::Type_Kind_VARCHAR);
typ->set_maximumlength(type->getMaximumLength());
} else if (type->getKind() == orc::ORCTypeKind::CHAR) {
typ->set_kind(::orc::proto::Type_Kind::Type_Kind_CHAR);
typ->set_maximumlength(type->getMaximumLength());
} else {
LOG_ERROR(ERRCODE_INTERNAL_ERROR, "unknown string/varchar/char type %d",
type->getKind());
}
}
orc::proto::ColumnEncoding_Kind StringColumnWriter::getProtoColumnEncoding() {
if (useDictionaryEncoding) {
switch (version) {
case RleVersion::RleVersion_0:
return orc::proto::ColumnEncoding_Kind_DICTIONARY_V0;
case RleVersion::RleVersion_1:
return orc::proto::ColumnEncoding_Kind_DICTIONARY;
case RleVersion::RleVersion_2:
return orc::proto::ColumnEncoding_Kind_DICTIONARY_V2;
default:
LOG_ERROR(ERRCODE_FEATURE_NOT_SUPPORTED, "Version %u not supported",
version);
}
} else {
switch (version) {
case RleVersion::RleVersion_0:
return orc::proto::ColumnEncoding_Kind_DIRECT_V0;
case RleVersion::RleVersion_1:
return orc::proto::ColumnEncoding_Kind_DIRECT;
case RleVersion::RleVersion_2:
return orc::proto::ColumnEncoding_Kind_DIRECT_V2;
default:
LOG_ERROR(ERRCODE_FEATURE_NOT_SUPPORTED, "VersionKind %u not supported",
version);
}
}
}
void StringColumnWriter::addBloomFilterEntry() {
ColumnWriter::addBloomFilterEntry();
checkDictionaryEncoding();
if (!useDictionaryEncoding && rows.size() > 0) flushDictionary();
}
void StringColumnWriter::checkDictionaryEncoding() {
if (!doneDictionaryCheck) {
double ratio = rows.size() > 0 ? static_cast<double>(dictionary.size()) /
static_cast<double>(rows.size())
: 0;
useDictionaryEncoding = ratio <= dictionaryKeySizeThreshold;
doneDictionaryCheck = true;
}
}
void StringColumnWriter::flushDictionary() {
std::vector<const char *> vals;
std::vector<uint64_t> lens;
std::vector<uint32_t> dumpOrder;
dictionary.dump(&vals, &lens, &dumpOrder);
std::vector<uint64_t> position;
uint64_t rowSize = rows.size();
position.resize(rowSize);
for (int i = 0; i < rowSize; ++i) {
position[i] = dumpOrder[rows[i]];
}
if (useDictionaryEncoding) {
uint64_t dictSize = lens.size();
for (uint64_t i = 0; i < dictSize; ++i)
dictDataBufferedStream->write(vals[i], lens[i]);
lengthRleCoder->write(lens.data(), dictSize, nullptr);
rowsRleCoder->write(position.data(), rowSize, nullptr);
} else {
for (uint64_t i = 0; i < rowSize; ++i) {
directDataBufferedStream->write(vals[position[i]], lens[position[i]]);
lengthRleCoder->write(&lens[position[i]], 1, nullptr);
}
}
rows.clear();
}
void StringColumnWriter::reset() {
ColumnWriter::reset();
dictionary.clear();
dictDataBufferedStream->reset();
lengthRleCoder->reset();
rowsRleCoder->reset();
directDataBufferedStream->reset();
stripeColStats->reset();
}
} // namespace orc