blob: bd1b32375c300b325fe9f9463a2517352b351fe5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "PartitionWriter.h"
#include <filesystem>
#include <format>
#include <memory>
#include <vector>
#include <Columns/ColumnsNumber.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteBufferFromString.h>
#include <Interpreters/sortBlock.h>
#include <Processors/Merges/Algorithms/MergingSortedAlgorithm.h>
#include <Processors/Transforms/SortingTransform.h>
#include <Storages/IO/CompressedWriteBuffer.h>
#include <Storages/IO/NativeWriter.h>
#include <boost/algorithm/string/case_conv.hpp>
#include <Common/BlockTypeUtils.h>
#include <Common/Stopwatch.h>
#include <Common/ThreadPool.h>
#include <Common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
}
using namespace DB;
namespace local_engine
{
static const String PARTITION_COLUMN_NAME = "partition";
int64_t searchLastPartitionIdIndex(ColumnPtr column, size_t start, size_t partition_id)
{
const auto & int64_column = checkAndGetColumn<ColumnUInt64>(*column);
int64_t low = start, high = int64_column.size() - 1;
while (low <= high)
{
int64_t mid = low + (high - low) / 2;
if (int64_column.get64(mid) > partition_id)
high = mid - 1;
else
low = mid + 1;
if (int64_column.get64(high) == partition_id)
return high;
}
return -1;
}
bool PartitionWriter::worthToSpill(size_t cache_size) const
{
return (options.spill_threshold > 0 && cache_size >= options.spill_threshold) ||
currentThreadGroupMemoryUsageRatio() > settings.spill_mem_ratio;
}
void PartitionWriter::write(const PartitionInfo & partition_info, DB::Block & block)
{
chassert(init);
/// PartitionWriter::write is alwasy the top frame who occupies evicting_or_writing
Stopwatch watch;
size_t current_cached_bytes = bytes();
for (size_t partition_id = 0; partition_id < partition_info.partition_num; ++partition_id)
{
size_t from = partition_info.partition_start_points[partition_id];
size_t length = partition_info.partition_start_points[partition_id + 1] - from;
/// Make sure buffer size is no greater than split_size
auto & block_buffer = partition_block_buffer[partition_id];
auto & buffer = partition_buffer[partition_id];
if (!block_buffer->empty() && block_buffer->size() + length >= options.split_size)
buffer->addBlock(block_buffer->releaseColumns());
current_cached_bytes -= block_buffer->bytes();
for (size_t col_i = 0; col_i < block.columns(); ++col_i)
block_buffer->appendSelective(col_i, block, partition_info.partition_selector, from, length);
current_cached_bytes += block_buffer->bytes();
/// Only works for celeborn partitiion writer
if (supportsEvictSinglePartition() && worthToSpill(current_cached_bytes))
{
/// Calculate average rows of each partition block buffer
size_t avg_size = 0;
size_t cnt = 0;
for (size_t i = (last_partition_id + 1) % options.partition_num; i != (partition_id + 1) % options.partition_num;
i = (i + 1) % options.partition_num)
{
avg_size += partition_block_buffer[i]->size();
++cnt;
}
avg_size /= cnt;
for (size_t i = (last_partition_id + 1) % options.partition_num; i != (partition_id + 1) % options.partition_num;
i = (i + 1) % options.partition_num)
{
bool flush_block_buffer = partition_block_buffer[i]->size() >= avg_size;
current_cached_bytes -= flush_block_buffer
? partition_block_buffer[i]->bytes() + partition_buffer[i]->bytes()
: partition_buffer[i]->bytes();
evictSinglePartition(i);
}
// std::cout << "current cached bytes after evict partitions is " << current_cached_bytes << " partition from "
// << (last_partition_id + 1) % options->partition_num << " to " << partition_id << " average size:" << avg_size
// << std::endl;
last_partition_id = partition_id;
}
}
/// Only works for local partition writer
if (!supportsEvictSinglePartition() && worthToSpill(current_cached_bytes))
evictPartitions();
split_result->total_split_time += watch.elapsedNanoseconds();
}
size_t LocalPartitionWriter::evictPartitions()
{
size_t res = 0;
size_t spilled_bytes = 0;
auto spill_to_file = [this, &res, &spilled_bytes]()
{
auto file = getNextSpillFile();
WriteBufferFromFile output(file, options.io_buffer_size);
auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(options.compress_method), options.compress_level);
CompressedWriteBuffer compressed_output(output, codec, options.io_buffer_size);
NativeWriter writer(compressed_output, output_header);
SpillInfo info;
info.spilled_file = file;
Stopwatch serialization_time_watch;
for (size_t partition_id = 0; partition_id < partition_buffer.size(); ++partition_id)
{
auto & buffer = partition_buffer[partition_id];
auto & block_buffer = partition_block_buffer[partition_id];
if (!block_buffer->empty())
buffer->addBlock(block_buffer->releaseColumns());
if (buffer->empty())
continue;
std::pair<size_t, size_t> offsets;
offsets.first = output.count();
spilled_bytes += buffer->bytes();
size_t written_bytes = buffer->spill(writer);
res += written_bytes;
compressed_output.sync();
offsets.second = output.count() - offsets.first;
split_result->raw_partition_lengths[partition_id] += written_bytes;
info.partition_spill_infos[partition_id] = offsets;
}
spill_infos.emplace_back(info);
split_result->total_compress_time += compressed_output.getCompressTime();
split_result->total_write_time += compressed_output.getWriteTime();
split_result->total_serialize_time += serialization_time_watch.elapsedNanoseconds();
compressed_output.finalize();
output.finalize();
};
Stopwatch spill_time_watch;
spill_to_file();
split_result->total_spill_time += spill_time_watch.elapsedNanoseconds();
split_result->total_bytes_spilled += spilled_bytes;
LOG_INFO(logger, "spill shuffle data {} bytes, use spill time {} ms", spilled_bytes, spill_time_watch.elapsedMilliseconds());
return res;
}
String Spillable::getNextSpillFile()
{
auto file_name = std::to_string(static_cast<Int64>(spill_options.shuffle_id)) + "_" + std::to_string(
static_cast<Int64>(spill_options.map_id)) + "_" + std::to_string(reinterpret_cast<Int64>(this)) + "_" + std::to_string(
spill_infos.size());
std::hash<std::string> hasher;
auto hash = hasher(file_name);
auto dir_id = hash % spill_options.local_dirs_list.size();
auto sub_dir_id = (hash / spill_options.local_dirs_list.size()) % spill_options.num_sub_dirs;
std::string dir = std::filesystem::path(spill_options.local_dirs_list[dir_id]) / std::format("{:02x}", sub_dir_id);
if (!std::filesystem::exists(dir))
std::filesystem::create_directories(dir);
return std::filesystem::path(dir) / file_name;
}
void SortBasedPartitionWriter::write(const PartitionInfo & info, DB::Block & block)
{
Stopwatch write_time_watch;
if (output_header.columns() == 0)
output_header = block.cloneEmpty();
auto partition_column = ColumnUInt64::create();
partition_column->reserve(block.rows());
partition_column->getData().insert_assume_reserved(info.src_partition_num.begin(), info.src_partition_num.end());
block.insert({std::move(partition_column), std::make_shared<DataTypeUInt64>(), PARTITION_COLUMN_NAME});
if (sort_header.columns() == 0)
{
sort_header = block.cloneEmpty();
sort_description.emplace_back(SortColumnDescription(PARTITION_COLUMN_NAME));
}
// partial sort
sortBlock(block, sort_description);
Chunk chunk;
chunk.setColumns(block.getColumns(), block.rows());
accumulated_blocks.emplace_back(std::move(chunk));
current_accumulated_bytes += accumulated_blocks.back().allocatedBytes();
current_accumulated_rows += accumulated_blocks.back().getNumRows();
split_result->total_write_time += write_time_watch.elapsedNanoseconds();
if (worthToSpill(current_accumulated_bytes))
evictPartitions();
}
LocalPartitionWriter::LocalPartitionWriter(const SplitOptions & options)
: PartitionWriter(options, getLogger("LocalPartitionWriter"))
, Spillable(options)
{
}
PartitionWriter::PartitionWriter(const SplitOptions & options, LoggerPtr logger_)
: options(options)
, partition_block_buffer(options.partition_num)
, partition_buffer(options.partition_num)
, last_partition_id(options.partition_num - 1)
, logger(logger_)
{
for (size_t partition_id = 0; partition_id < options.partition_num; ++partition_id)
{
partition_block_buffer[partition_id] = std::make_shared<ColumnsBuffer>(options.split_size);
partition_buffer[partition_id] = std::make_shared<Partition>();
}
settings = MemoryConfig::loadFromContext(QueryContext::globalContext());
}
size_t PartitionWriter::bytes() const
{
size_t bytes = 0;
for (const auto & buffer : partition_block_buffer)
bytes += buffer->bytes();
for (const auto & buffer : partition_buffer)
bytes += buffer->bytes();
return bytes;
}
size_t MemorySortLocalPartitionWriter::evictPartitions()
{
size_t res = 0;
size_t spilled_bytes = 0;
auto spill_to_file = [this, &res, &spilled_bytes]()
{
if (accumulated_blocks.empty())
return;
auto file = getNextSpillFile();
WriteBufferFromFile output(file, options.io_buffer_size);
auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(options.compress_method), options.compress_level);
CompressedWriteBuffer compressed_output(output, codec, options.io_buffer_size);
NativeWriter writer(compressed_output, output_header);
SpillInfo info;
info.spilled_file = file;
Stopwatch serialization_time_watch;
MergeSorter sorter(toShared(sort_header), std::move(accumulated_blocks), sort_description, adaptiveBlockSize(), 0);
size_t cur_partition_id = 0;
info.partition_spill_infos[cur_partition_id] = {0, 0};
while (auto data = sorter.read())
{
Block serialized_block = sort_header.cloneWithColumns(data.detachColumns());
const auto partitions = serialized_block.getByName(PARTITION_COLUMN_NAME).column;
serialized_block.erase(PARTITION_COLUMN_NAME);
size_t row_offset = 0;
while (row_offset < serialized_block.rows())
{
auto last_idx = searchLastPartitionIdIndex(partitions, row_offset, cur_partition_id);
if (last_idx < 0)
{
auto & last = info.partition_spill_infos[cur_partition_id];
compressed_output.sync();
last.second = output.count() - last.first;
cur_partition_id++;
info.partition_spill_infos[cur_partition_id] = {last.first + last.second, 0};
continue;
}
if (row_offset == 0 && last_idx == serialized_block.rows() - 1)
{
auto count = writer.write(serialized_block);
split_result->raw_partition_lengths[cur_partition_id] += count;
break;
}
else
{
auto cut_block = serialized_block.cloneWithCutColumns(row_offset, last_idx - row_offset + 1);
auto count = writer.write(cut_block);
split_result->raw_partition_lengths[cur_partition_id] += count;
row_offset = last_idx + 1;
if (last_idx != serialized_block.rows() - 1)
{
auto & last = info.partition_spill_infos[cur_partition_id];
compressed_output.sync();
last.second = output.count() - last.first;
cur_partition_id++;
info.partition_spill_infos[cur_partition_id] = {last.first + last.second, 0};
}
}
}
}
compressed_output.sync();
auto & last = info.partition_spill_infos[cur_partition_id];
last.second = output.count() - last.first;
spilled_bytes = current_accumulated_bytes;
res = current_accumulated_bytes;
current_accumulated_bytes = 0;
current_accumulated_rows = 0;
std::erase_if(
info.partition_spill_infos,
[](const auto & item)
{
auto const & [key, value] = item;
return value.second == 0;
});
spill_infos.emplace_back(info);
split_result->total_compress_time += compressed_output.getCompressTime();
split_result->total_io_time += compressed_output.getWriteTime();
split_result->total_serialize_time += serialization_time_watch.elapsedNanoseconds();
compressed_output.finalize();
output.finalize();
};
Stopwatch spill_time_watch;
spill_to_file();
split_result->total_spill_time += spill_time_watch.elapsedNanoseconds();
split_result->total_bytes_spilled += spilled_bytes;
LOG_INFO(logger, "spill shuffle data {} bytes, use spill time {} ms", spilled_bytes, spill_time_watch.elapsedMilliseconds());
return res;
}
size_t MemorySortCelebornPartitionWriter::evictPartitions()
{
size_t res = 0;
size_t spilled_bytes = 0;
auto spill_to_celeborn = [this, &res, &spilled_bytes]()
{
Stopwatch serialization_time_watch;
/// Skip empty buffer
if (accumulated_blocks.empty())
return;
WriteBufferFromOwnString output;
auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(options.compress_method), options.compress_level);
CompressedWriteBuffer compressed_output(output, codec, options.io_buffer_size);
NativeWriter writer(compressed_output, output_header);
MergeSorter sorter(toShared(sort_header), std::move(accumulated_blocks), sort_description, adaptiveBlockSize(), 0);
size_t cur_partition_id = 0;
auto push_to_celeborn = [&]()
{
compressed_output.sync();
auto & data = output.str();
if (!data.empty())
{
Stopwatch push_time_watch;
celeborn_client->pushPartitionData(cur_partition_id, data.data(), data.size());
split_result->total_io_time += push_time_watch.elapsedNanoseconds();
split_result->partition_lengths[cur_partition_id] += data.size();
split_result->total_bytes_written += data.size();
}
output.restart();
};
while (auto data = sorter.read())
{
Block serialized_block = sort_header.cloneWithColumns(data.detachColumns());
const auto partitions = serialized_block.getByName(PARTITION_COLUMN_NAME).column;
serialized_block.erase(PARTITION_COLUMN_NAME);
size_t row_offset = 0;
while (row_offset < serialized_block.rows())
{
auto last_idx = searchLastPartitionIdIndex(partitions, row_offset, cur_partition_id);
if (last_idx < 0)
{
push_to_celeborn();
cur_partition_id++;
continue;
}
if (row_offset == 0 && last_idx == serialized_block.rows() - 1)
{
auto count = writer.write(serialized_block);
split_result->raw_partition_lengths[cur_partition_id] += count;
break;
}
auto cut_block = serialized_block.cloneWithCutColumns(row_offset, last_idx - row_offset + 1);
auto count = writer.write(cut_block);
split_result->raw_partition_lengths[cur_partition_id] += count;
row_offset = last_idx + 1;
if (last_idx != serialized_block.rows() - 1)
{
push_to_celeborn();
cur_partition_id++;
}
}
}
push_to_celeborn();
spilled_bytes = current_accumulated_bytes;
res = current_accumulated_bytes;
current_accumulated_bytes = 0;
current_accumulated_rows = 0;
split_result->total_compress_time += compressed_output.getCompressTime();
split_result->total_io_time += compressed_output.getWriteTime();
split_result->total_serialize_time += serialization_time_watch.elapsedNanoseconds();
compressed_output.finalize();
output.finalize();
};
Stopwatch spill_time_watch;
spill_to_celeborn();
split_result->total_spill_time += spill_time_watch.elapsedNanoseconds();
split_result->total_bytes_spilled += spilled_bytes;
LOG_INFO(logger, "spill shuffle data {} bytes, use spill time {} ms", spilled_bytes, spill_time_watch.elapsedMilliseconds());
return res;
}
CelebornPartitionWriter::CelebornPartitionWriter(const SplitOptions & options, std::unique_ptr<CelebornClient> celeborn_client_)
: PartitionWriter(options, getLogger("CelebornPartitionWriter"))
, celeborn_client(std::move(celeborn_client_))
{
}
size_t CelebornPartitionWriter::evictPartitions()
{
size_t res = 0;
for (size_t partition_id = 0; partition_id < options.partition_num; ++partition_id)
res += evictSinglePartition(partition_id);
return res;
}
size_t CelebornPartitionWriter::evictSinglePartition(size_t partition_id)
{
size_t res = 0;
size_t spilled_bytes = 0;
auto spill_to_celeborn = [this, partition_id, &res, &spilled_bytes]()
{
Stopwatch serialization_time_watch;
auto & buffer = partition_buffer[partition_id];
auto & block_buffer = partition_block_buffer[partition_id];
if (!block_buffer->empty())
{
// std::cout << "flush block buffer for partition:" << partition_id << " rows:" << block_buffer->size() << std::endl;
buffer->addBlock(block_buffer->releaseColumns());
}
/// Skip empty buffer
if (buffer->empty())
return;
WriteBufferFromOwnString output;
auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(options.compress_method), options.compress_level);
CompressedWriteBuffer compressed_output(output, codec, options.io_buffer_size);
NativeWriter writer(compressed_output, output_header);
spilled_bytes += buffer->bytes();
size_t written_bytes = buffer->spill(writer);
res += written_bytes;
compressed_output.sync();
// std::cout << "evict partition " << partition_id << " uncompress_bytes:" << compressed_output.getUncompressedBytes()
// << " compress_bytes:" << compressed_output.getCompressedBytes() << std::endl;
Stopwatch push_time_watch;
celeborn_client->pushPartitionData(partition_id, output.str().data(), output.str().size());
split_result->partition_lengths[partition_id] += output.str().size();
split_result->raw_partition_lengths[partition_id] += written_bytes;
split_result->total_compress_time += compressed_output.getCompressTime();
split_result->total_write_time += compressed_output.getWriteTime();
split_result->total_write_time += push_time_watch.elapsedNanoseconds();
split_result->total_io_time += push_time_watch.elapsedNanoseconds();
split_result->total_serialize_time += serialization_time_watch.elapsedNanoseconds();
split_result->total_bytes_written += output.str().size();
};
Stopwatch spill_time_watch;
spill_to_celeborn();
split_result->total_spill_time += spill_time_watch.elapsedNanoseconds();
split_result->total_bytes_spilled += spilled_bytes;
LOG_INFO(logger, "spill shuffle data {} bytes, use spill time {} ms", spilled_bytes, spill_time_watch.elapsedMilliseconds());
return res;
}
void Partition::addBlock(DB::Block block)
{
/// Do not insert empty blocks, otherwise will cause the shuffle read terminate early.
if (!block.rows())
return;
cached_bytes += block.bytes();
blocks.emplace_back(std::move(block));
}
size_t Partition::spill(NativeWriter & writer)
{
size_t written_bytes = 0;
for (auto & block : blocks)
{
if (!block.rows())
continue;
written_bytes += writer.write(block);
/// Clear each block once it is serialized to reduce peak memory
DB::Block().swap(block);
}
blocks.clear();
cached_bytes = 0;
return written_bytes;
}
}