blob: 66ecedec268f9374c2dae02d1c910ee2f9a8547d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "orc/Common.hh"
#include "orc/OrcFile.hh"
#include "ColumnWriter.hh"
#include "Timezone.hh"
#include <memory>
namespace orc {
struct WriterOptionsPrivate {
uint64_t stripeSize;
uint64_t compressionBlockSize;
uint64_t rowIndexStride;
CompressionKind compression;
CompressionStrategy compressionStrategy;
MemoryPool* memoryPool;
double paddingTolerance;
std::ostream* errorStream;
FileVersion fileVersion;
double dictionaryKeySizeThreshold;
bool enableIndex;
std::set<uint64_t> columnsUseBloomFilter;
double bloomFilterFalsePositiveProb;
BloomFilterVersion bloomFilterVersion;
WriterOptionsPrivate() :
fileVersion(FileVersion::v_0_12()) { // default to Hive_0_12
stripeSize = 64 * 1024 * 1024; // 64M
compressionBlockSize = 64 * 1024; // 64K
rowIndexStride = 10000;
compression = CompressionKind_ZLIB;
compressionStrategy = CompressionStrategy_SPEED;
memoryPool = getDefaultPool();
paddingTolerance = 0.0;
errorStream = &std::cerr;
dictionaryKeySizeThreshold = 0.0;
enableIndex = true;
bloomFilterFalsePositiveProb = 0.05;
bloomFilterVersion = UTF8;
}
};
WriterOptions::WriterOptions():
privateBits(std::unique_ptr<WriterOptionsPrivate>
(new WriterOptionsPrivate())) {
// PASS
}
WriterOptions::WriterOptions(const WriterOptions& rhs):
privateBits(std::unique_ptr<WriterOptionsPrivate>
(new WriterOptionsPrivate(*(rhs.privateBits.get())))) {
// PASS
}
WriterOptions::WriterOptions(WriterOptions& rhs) {
// swap privateBits with rhs
privateBits.swap(rhs.privateBits);
}
WriterOptions& WriterOptions::operator=(const WriterOptions& rhs) {
if (this != &rhs) {
privateBits.reset(new WriterOptionsPrivate(*(rhs.privateBits.get())));
}
return *this;
}
WriterOptions::~WriterOptions() {
// PASS
}
RleVersion WriterOptions::getRleVersion() const {
if(privateBits->fileVersion == FileVersion::v_0_11())
{
return RleVersion_1;
}
return RleVersion_2;
}
WriterOptions& WriterOptions::setStripeSize(uint64_t size) {
privateBits->stripeSize = size;
return *this;
}
uint64_t WriterOptions::getStripeSize() const {
return privateBits->stripeSize;
}
WriterOptions& WriterOptions::setCompressionBlockSize(uint64_t size) {
privateBits->compressionBlockSize = size;
return *this;
}
uint64_t WriterOptions::getCompressionBlockSize() const {
return privateBits->compressionBlockSize;
}
WriterOptions& WriterOptions::setRowIndexStride(uint64_t stride) {
privateBits->rowIndexStride = stride;
privateBits->enableIndex = (stride != 0);
return *this;
}
uint64_t WriterOptions::getRowIndexStride() const {
return privateBits->rowIndexStride;
}
WriterOptions& WriterOptions::setDictionaryKeySizeThreshold(double val) {
privateBits->dictionaryKeySizeThreshold = val;
return *this;
}
double WriterOptions::getDictionaryKeySizeThreshold() const {
return privateBits->dictionaryKeySizeThreshold;
}
WriterOptions& WriterOptions::setFileVersion(const FileVersion& version) {
// Only Hive_0_11 and Hive_0_12 version are supported currently
if (version.getMajor() == 0 && (version.getMinor() == 11 || version.getMinor() == 12)) {
privateBits->fileVersion = version;
return *this;
}
throw std::logic_error("Unsupported file version specified.");
}
FileVersion WriterOptions::getFileVersion() const {
return privateBits->fileVersion;
}
WriterOptions& WriterOptions::setCompression(CompressionKind comp) {
privateBits->compression = comp;
return *this;
}
CompressionKind WriterOptions::getCompression() const {
return privateBits->compression;
}
WriterOptions& WriterOptions::setCompressionStrategy(
CompressionStrategy strategy) {
privateBits->compressionStrategy = strategy;
return *this;
}
CompressionStrategy WriterOptions::getCompressionStrategy() const {
return privateBits->compressionStrategy;
}
bool WriterOptions::getAlignedBitpacking() const {
return privateBits->compressionStrategy == CompressionStrategy ::CompressionStrategy_SPEED;
}
WriterOptions& WriterOptions::setPaddingTolerance(double tolerance) {
privateBits->paddingTolerance = tolerance;
return *this;
}
double WriterOptions::getPaddingTolerance() const {
return privateBits->paddingTolerance;
}
WriterOptions& WriterOptions::setMemoryPool(MemoryPool* memoryPool) {
privateBits->memoryPool = memoryPool;
return *this;
}
MemoryPool* WriterOptions::getMemoryPool() const {
return privateBits->memoryPool;
}
WriterOptions& WriterOptions::setErrorStream(std::ostream& errStream) {
privateBits->errorStream = &errStream;
return *this;
}
std::ostream* WriterOptions::getErrorStream() const {
return privateBits->errorStream;
}
bool WriterOptions::getEnableIndex() const {
return privateBits->enableIndex;
}
bool WriterOptions::getEnableDictionary() const {
return privateBits->dictionaryKeySizeThreshold > 0.0;
}
WriterOptions& WriterOptions::setColumnsUseBloomFilter(
const std::set<uint64_t>& columns) {
privateBits->columnsUseBloomFilter = columns;
return *this;
}
bool WriterOptions::isColumnUseBloomFilter(uint64_t column) const {
return privateBits->columnsUseBloomFilter.find(column) !=
privateBits->columnsUseBloomFilter.end();
}
WriterOptions& WriterOptions::setBloomFilterFPP(double fpp) {
privateBits->bloomFilterFalsePositiveProb = fpp;
return *this;
}
double WriterOptions::getBloomFilterFPP() const {
return privateBits->bloomFilterFalsePositiveProb;
}
// delibrately not provide setter to write bloom filter version because
// we only support UTF8 for now.
BloomFilterVersion WriterOptions::getBloomFilterVersion() const {
return privateBits->bloomFilterVersion;
}
Writer::~Writer() {
// PASS
}
class WriterImpl : public Writer {
private:
std::unique_ptr<ColumnWriter> columnWriter;
std::unique_ptr<BufferedOutputStream> compressionStream;
std::unique_ptr<BufferedOutputStream> bufferedStream;
std::unique_ptr<StreamsFactory> streamsFactory;
OutputStream* outStream;
WriterOptions options;
const Type& type;
uint64_t stripeRows, totalRows, indexRows;
uint64_t currentOffset;
proto::Footer fileFooter;
proto::PostScript postScript;
proto::StripeInformation stripeInfo;
proto::Metadata metadata;
static const char* magicId;
static const WriterId writerId;
public:
WriterImpl(
const Type& type,
OutputStream* stream,
const WriterOptions& options);
std::unique_ptr<ColumnVectorBatch> createRowBatch(uint64_t size)
const override;
void add(ColumnVectorBatch& rowsToAdd) override;
void close() override;
void addUserMetadata(const std::string name, const std::string value) override;
private:
void init();
void initStripe();
void writeStripe();
void writeMetadata();
void writeFileFooter();
void writePostscript();
void buildFooterType(const Type& t, proto::Footer& footer, uint32_t& index);
static proto::CompressionKind convertCompressionKind(
const CompressionKind& kind);
};
const char * WriterImpl::magicId = "ORC";
const WriterId WriterImpl::writerId = WriterId::ORC_CPP_WRITER;
WriterImpl::WriterImpl(
const Type& t,
OutputStream* stream,
const WriterOptions& opts) :
outStream(stream),
options(opts),
type(t) {
streamsFactory = createStreamsFactory(options, outStream);
columnWriter = buildWriter(type, *streamsFactory, options);
stripeRows = totalRows = indexRows = 0;
currentOffset = 0;
// compression stream for stripe footer, file footer and metadata
compressionStream = createCompressor(
options.getCompression(),
outStream,
options.getCompressionStrategy(),
1 * 1024 * 1024, // buffer capacity: 1M
options.getCompressionBlockSize(),
*options.getMemoryPool());
// uncompressed stream for post script
bufferedStream.reset(new BufferedOutputStream(
*options.getMemoryPool(),
outStream,
1024, // buffer capacity: 1024 bytes
options.getCompressionBlockSize()));
init();
}
std::unique_ptr<ColumnVectorBatch> WriterImpl::createRowBatch(uint64_t size)
const {
return type.createRowBatch(size, *options.getMemoryPool());
}
void WriterImpl::add(ColumnVectorBatch& rowsToAdd) {
if (options.getEnableIndex()) {
uint64_t pos = 0;
uint64_t chunkSize = 0;
uint64_t rowIndexStride = options.getRowIndexStride();
while (pos < rowsToAdd.numElements) {
chunkSize = std::min(rowsToAdd.numElements - pos,
rowIndexStride - indexRows);
columnWriter->add(rowsToAdd, pos, chunkSize, nullptr);
pos += chunkSize;
indexRows += chunkSize;
stripeRows += chunkSize;
if (indexRows >= rowIndexStride) {
columnWriter->createRowIndexEntry();
indexRows = 0;
}
}
} else {
stripeRows += rowsToAdd.numElements;
columnWriter->add(rowsToAdd, 0, rowsToAdd.numElements, nullptr);
}
if (columnWriter->getEstimatedSize() >= options.getStripeSize()) {
writeStripe();
}
}
void WriterImpl::close() {
if (stripeRows > 0) {
writeStripe();
}
writeMetadata();
writeFileFooter();
writePostscript();
outStream->close();
}
void WriterImpl::addUserMetadata(const std::string name, const std::string value){
proto::UserMetadataItem* userMetadataItem = fileFooter.add_metadata();
userMetadataItem->set_name(name);
userMetadataItem->set_value(value);
}
void WriterImpl::init() {
// Write file header
const static size_t magicIdLength = strlen(WriterImpl::magicId);
outStream->write(WriterImpl::magicId, magicIdLength);
currentOffset += magicIdLength;
// Initialize file footer
fileFooter.set_headerlength(currentOffset);
fileFooter.set_contentlength(0);
fileFooter.set_numberofrows(0);
fileFooter.set_rowindexstride(
static_cast<uint32_t>(options.getRowIndexStride()));
fileFooter.set_writer(writerId);
uint32_t index = 0;
buildFooterType(type, fileFooter, index);
// Initialize post script
postScript.set_footerlength(0);
postScript.set_compression(
WriterImpl::convertCompressionKind(options.getCompression()));
postScript.set_compressionblocksize(options.getCompressionBlockSize());
postScript.add_version(options.getFileVersion().getMajor());
postScript.add_version(options.getFileVersion().getMinor());
postScript.set_writerversion(WriterVersion_ORC_135);
postScript.set_magic("ORC");
// Initialize first stripe
initStripe();
}
void WriterImpl::initStripe() {
stripeInfo.set_offset(currentOffset);
stripeInfo.set_indexlength(0);
stripeInfo.set_datalength(0);
stripeInfo.set_footerlength(0);
stripeInfo.set_numberofrows(0);
stripeRows = indexRows = 0;
}
void WriterImpl::writeStripe() {
if (options.getEnableIndex() && indexRows != 0) {
columnWriter->createRowIndexEntry();
indexRows = 0;
} else {
columnWriter->mergeRowGroupStatsIntoStripeStats();
}
// dictionary should be written before any stream is flushed
columnWriter->writeDictionary();
std::vector<proto::Stream> streams;
// write ROW_INDEX streams
if (options.getEnableIndex()) {
columnWriter->writeIndex(streams);
}
// write streams like PRESENT, DATA, etc.
columnWriter->flush(streams);
// generate and write stripe footer
proto::StripeFooter stripeFooter;
for (uint32_t i = 0; i < streams.size(); ++i) {
*stripeFooter.add_streams() = streams[i];
}
std::vector<proto::ColumnEncoding> encodings;
columnWriter->getColumnEncoding(encodings);
for (uint32_t i = 0; i < encodings.size(); ++i) {
*stripeFooter.add_columns() = encodings[i];
}
// use GMT to guarantee TimestampVectorBatch from reader can write
// same wall clock time
stripeFooter.set_writertimezone("GMT");
// add stripe statistics to metadata
proto::StripeStatistics* stripeStats = metadata.add_stripestats();
std::vector<proto::ColumnStatistics> colStats;
columnWriter->getStripeStatistics(colStats);
for (uint32_t i = 0; i != colStats.size(); ++i) {
*stripeStats->add_colstats() = colStats[i];
}
// merge stripe stats into file stats and clear stripe stats
columnWriter->mergeStripeStatsIntoFileStats();
if (!stripeFooter.SerializeToZeroCopyStream(compressionStream.get())) {
throw std::logic_error("Failed to write stripe footer.");
}
uint64_t footerLength = compressionStream->flush();
// calculate data length and index length
uint64_t dataLength = 0;
uint64_t indexLength = 0;
for (uint32_t i = 0; i < streams.size(); ++i) {
if (streams[i].kind() == proto::Stream_Kind_ROW_INDEX ||
streams[i].kind() == proto::Stream_Kind_BLOOM_FILTER_UTF8) {
indexLength += streams[i].length();
} else {
dataLength += streams[i].length();
}
}
// update stripe info
stripeInfo.set_indexlength(indexLength);
stripeInfo.set_datalength(dataLength);
stripeInfo.set_footerlength(footerLength);
stripeInfo.set_numberofrows(stripeRows);
*fileFooter.add_stripes() = stripeInfo;
currentOffset = currentOffset + indexLength + dataLength + footerLength;
totalRows += stripeRows;
columnWriter->reset();
initStripe();
}
void WriterImpl::writeMetadata() {
if (!metadata.SerializeToZeroCopyStream(compressionStream.get())) {
throw std::logic_error("Failed to write metadata.");
}
postScript.set_metadatalength(compressionStream.get()->flush());
}
void WriterImpl::writeFileFooter() {
fileFooter.set_contentlength(currentOffset - fileFooter.headerlength());
fileFooter.set_numberofrows(totalRows);
// update file statistics
std::vector<proto::ColumnStatistics> colStats;
columnWriter->getFileStatistics(colStats);
for (uint32_t i = 0; i != colStats.size(); ++i) {
*fileFooter.add_statistics() = colStats[i];
}
if (!fileFooter.SerializeToZeroCopyStream(compressionStream.get())) {
throw std::logic_error("Failed to write file footer.");
}
postScript.set_footerlength(compressionStream->flush());
}
void WriterImpl::writePostscript() {
if (!postScript.SerializeToZeroCopyStream(bufferedStream.get())) {
throw std::logic_error("Failed to write post script.");
}
unsigned char psLength =
static_cast<unsigned char>(bufferedStream->flush());
outStream->write(&psLength, sizeof(unsigned char));
}
void WriterImpl::buildFooterType(
const Type& t,
proto::Footer& footer,
uint32_t & index) {
proto::Type protoType;
protoType.set_maximumlength(static_cast<uint32_t>(t.getMaximumLength()));
protoType.set_precision(static_cast<uint32_t>(t.getPrecision()));
protoType.set_scale(static_cast<uint32_t>(t.getScale()));
switch (t.getKind()) {
case BOOLEAN: {
protoType.set_kind(proto::Type_Kind_BOOLEAN);
break;
}
case BYTE: {
protoType.set_kind(proto::Type_Kind_BYTE);
break;
}
case SHORT: {
protoType.set_kind(proto::Type_Kind_SHORT);
break;
}
case INT: {
protoType.set_kind(proto::Type_Kind_INT);
break;
}
case LONG: {
protoType.set_kind(proto::Type_Kind_LONG);
break;
}
case FLOAT: {
protoType.set_kind(proto::Type_Kind_FLOAT);
break;
}
case DOUBLE: {
protoType.set_kind(proto::Type_Kind_DOUBLE);
break;
}
case STRING: {
protoType.set_kind(proto::Type_Kind_STRING);
break;
}
case BINARY: {
protoType.set_kind(proto::Type_Kind_BINARY);
break;
}
case TIMESTAMP: {
protoType.set_kind(proto::Type_Kind_TIMESTAMP);
break;
}
case LIST: {
protoType.set_kind(proto::Type_Kind_LIST);
break;
}
case MAP: {
protoType.set_kind(proto::Type_Kind_MAP);
break;
}
case STRUCT: {
protoType.set_kind(proto::Type_Kind_STRUCT);
break;
}
case UNION: {
protoType.set_kind(proto::Type_Kind_UNION);
break;
}
case DECIMAL: {
protoType.set_kind(proto::Type_Kind_DECIMAL);
break;
}
case DATE: {
protoType.set_kind(proto::Type_Kind_DATE);
break;
}
case VARCHAR: {
protoType.set_kind(proto::Type_Kind_VARCHAR);
break;
}
case CHAR: {
protoType.set_kind(proto::Type_Kind_CHAR);
break;
}
default:
throw std::logic_error("Unknown type.");
}
int pos = static_cast<int>(index);
*footer.add_types() = protoType;
for (uint64_t i = 0; i < t.getSubtypeCount(); ++i) {
// only add subtypes' field names if this type is STRUCT
if (t.getKind() == STRUCT) {
footer.mutable_types(pos)->add_fieldnames(t.getFieldName(i));
}
footer.mutable_types(pos)->add_subtypes(++index);
buildFooterType(*t.getSubtype(i), footer, index);
}
}
proto::CompressionKind WriterImpl::convertCompressionKind(
const CompressionKind& kind) {
return static_cast<proto::CompressionKind>(kind);
}
std::unique_ptr<Writer> createWriter(
const Type& type,
OutputStream* stream,
const WriterOptions& options) {
return std::unique_ptr<Writer>(
new WriterImpl(
type,
stream,
options));
}
}