blob: 026418a223c46c7433150f26a2d854926948163e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "utils/VeloxWriterUtils.h"
#include <boost/algorithm/string.hpp>
#include "config/GlutenConfig.h"
#include "utils/ConfigExtractor.h"
#include "utils/Exception.h"
#include "memory/VeloxMemoryManager.h"
#include "velox/common/compression/Compression.h"
#include "velox/type/Type.h"
namespace gluten {
using namespace facebook::velox;
using namespace facebook::velox::parquet;
using namespace facebook::velox::common;
namespace {
const int32_t kGzipWindowBits4k = 12;
const int32_t kZSTDDefaultCompressionLevel = 3;
} // namespace
std::unique_ptr<WriterOptions> makeParquetWriteOption(const std::unordered_map<std::string, std::string>& sparkConfs) {
int64_t maxRowGroupBytes = 134217728; // 128MB
int64_t maxRowGroupRows = 100000000; // 100M
if (auto it = sparkConfs.find(kParquetBlockSize); it != sparkConfs.end()) {
maxRowGroupBytes = std::stoll(it->second);
}
if (auto it = sparkConfs.find(kParquetBlockRows); it != sparkConfs.end()) {
maxRowGroupRows = std::stoll(it->second);
}
auto writeOption = std::make_unique<WriterOptions>();
writeOption->parquetWriteTimestampUnit = TimestampPrecision::kMicroseconds /*micro*/;
auto compressionCodec = CompressionKind::CompressionKind_SNAPPY;
if (auto it = sparkConfs.find(kParquetCompressionCodec); it != sparkConfs.end()) {
auto compressionCodecStr = it->second;
// spark support none, uncompressed, snappy, gzip, lzo, brotli, lz4, zstd.
if (boost::iequals(compressionCodecStr, "snappy")) {
compressionCodec = CompressionKind::CompressionKind_SNAPPY;
} else if (boost::iequals(compressionCodecStr, "gzip")) {
compressionCodec = CompressionKind::CompressionKind_GZIP;
if (sparkConfs.find(kParquetGzipWindowSize) != sparkConfs.end()) {
auto parquetGzipWindowSizeStr = sparkConfs.find(kParquetGzipWindowSize)->second;
if (parquetGzipWindowSizeStr == kGzipWindowSize4k) {
auto codecOptions = std::make_shared<parquet::arrow::util::GZipCodecOptions>();
codecOptions->windowBits = kGzipWindowBits4k;
writeOption->codecOptions = std::move(codecOptions);
}
}
} else if (boost::iequals(compressionCodecStr, "lzo")) {
compressionCodec = CompressionKind::CompressionKind_LZO;
} else if (boost::iequals(compressionCodecStr, "brotli")) {
// please make sure `brotli` is enabled when compiling
throw GlutenException("Gluten+velox does not support write parquet using brotli.");
} else if (boost::iequals(compressionCodecStr, "lz4")) {
compressionCodec = CompressionKind::CompressionKind_LZ4;
} else if (boost::iequals(compressionCodecStr, "zstd")) {
compressionCodec = CompressionKind::CompressionKind_ZSTD;
auto codecOptions = std::make_shared<parquet::arrow::util::CodecOptions>();
auto it = sparkConfs.find(kParquetZSTDCompressionLevel);
auto compressionLevel = it != sparkConfs.end() ? std::stoi(it->second) : kZSTDDefaultCompressionLevel;
codecOptions->compressionLevel = compressionLevel;
writeOption->codecOptions = std::move(codecOptions);
} else if (boost::iequals(compressionCodecStr, "uncompressed")) {
compressionCodec = CompressionKind::CompressionKind_NONE;
} else if (boost::iequals(compressionCodecStr, "none")) {
compressionCodec = CompressionKind::CompressionKind_NONE;
}
}
writeOption->compressionKind = compressionCodec;
writeOption->flushPolicyFactory = [maxRowGroupRows, maxRowGroupBytes]() {
return std::make_unique<LambdaFlushPolicy>(maxRowGroupRows, maxRowGroupBytes, [&]() { return false; });
};
writeOption->parquetWriteTimestampTimeZone = getConfigValue(sparkConfs, kSessionTimezone, std::nullopt);
writeOption->arrowMemoryPool =
getDefaultMemoryManager()->getOrCreateArrowMemoryPool("VeloxParquetWrite.ArrowMemoryPool");
if (auto it = sparkConfs.find(kParquetDataPageSize); it != sparkConfs.end()) {
auto dataPageSize = std::stoll(it->second);
writeOption->dataPageSize = dataPageSize;
}
if (auto it = sparkConfs.find(kParquetWriterVersion); it != sparkConfs.end()) {
auto parquetVersion = it->second;
if (boost::iequals(parquetVersion, "v2")) {
writeOption->useParquetDataPageV2 = true;
}
}
if (auto it = sparkConfs.find(kParquetEnableDictionary); it != sparkConfs.end()) {
auto enableDictionary = it->second;
if (boost::iequals(enableDictionary, "true")) {
writeOption->enableDictionary = true;
} else {
writeOption->enableDictionary = false;
}
}
return writeOption;
}
} // namespace gluten