blob: 0cea65f2d207b4b544e3face82ade57db43b6891 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <cstddef>
#include <memory>
#include <vector>
#include <Core/Block.h>
#include <Core/Settings.h>
#include <Interpreters/Context.h>
#include <Interpreters/TemporaryDataOnDisk.h>
#include <Shuffle/ShuffleCommon.h>
#include <jni/CelebornClient.h>
#include <Common/GlutenConfig.h>
#include <Common/QueryContext.h>
namespace DB
{
class MergingSortedAlgorithm;
namespace Setting
{
extern const SettingsUInt64 prefer_external_sort_block_bytes;
}
}
namespace local_engine
{
struct SpillInfo
{
std::string spilled_file;
std::map<size_t, std::pair<size_t, size_t>> partition_spill_infos;
};
class Partition
{
public:
Partition() = default;
~Partition() = default;
Partition(Partition && other) noexcept : blocks(std::move(other.blocks)) { }
bool empty() const { return blocks.empty(); }
void addBlock(DB::Block block);
size_t spill(NativeWriter & writer);
size_t bytes() const { return cached_bytes; }
private:
std::vector<DB::Block> blocks;
size_t cached_bytes = 0;
};
class CachedShuffleWriter;
using PartitionPtr = std::shared_ptr<Partition>;
class PartitionWriter : boost::noncopyable
{
friend class Spillable;
public:
PartitionWriter(const SplitOptions& options, LoggerPtr logger_);
virtual ~PartitionWriter() = default;
void initialize(SplitResult * split_result_, const DB::Block & output_header_)
{
if (!init)
{
split_result = split_result_;
chassert(split_result != nullptr);
split_result->partition_lengths.resize(options.partition_num);
split_result->raw_partition_lengths.resize(options.partition_num);
output_header = output_header_;
init = true;
}
}
virtual String getName() const = 0;
virtual void write(const PartitionInfo & info, DB::Block & block);
virtual bool useRSSPusher() const = 0;
virtual size_t evictPartitions() = 0;
protected:
size_t bytes() const;
virtual bool worthToSpill(size_t cache_size) const;
virtual bool supportsEvictSinglePartition() const { return false; }
virtual size_t evictSinglePartition(size_t partition_id)
{
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Evict single partition is not supported for {}", getName());
}
const SplitOptions & options;
MemoryConfig settings;
std::vector<ColumnsBufferPtr> partition_block_buffer;
std::vector<PartitionPtr> partition_buffer;
/// Only valid in celeborn partition writer
size_t last_partition_id;
SplitResult * split_result = nullptr;
DB::Block output_header;
LoggerPtr logger = nullptr;
bool init = false;
};
class Spillable
{
public:
struct ExtraData
{
std::vector<ColumnsBufferPtr> partition_block_buffer;
std::vector<PartitionPtr> partition_buffer;
};
Spillable(const SplitOptions& options_) : spill_options(options_) {}
virtual ~Spillable() = default;
const std::vector<SpillInfo> & getSpillInfos() const
{
return spill_infos;
}
protected:
String getNextSpillFile();
std::vector<SpillInfo> spill_infos;
const SplitOptions& spill_options;
};
class LocalPartitionWriter : public PartitionWriter, public Spillable
{
public:
explicit LocalPartitionWriter(const SplitOptions& options);
~LocalPartitionWriter() override = default;
String getName() const override { return "LocalPartitionWriter"; }
ExtraData getExtraData()
{
return {partition_block_buffer, partition_buffer};
}
size_t evictPartitions() override;
bool useRSSPusher() const override { return false; }
};
class SortBasedPartitionWriter : public PartitionWriter
{
protected:
explicit SortBasedPartitionWriter(const SplitOptions& options, LoggerPtr logger) : PartitionWriter(options, logger)
{
max_merge_block_size = options.split_size;
max_sort_buffer_size = options.max_sort_buffer_size;
max_merge_block_bytes = QueryContext::globalContext()->getSettingsRef()[DB::Setting::prefer_external_sort_block_bytes];
}
public:
String getName() const override { return "SortBasedPartitionWriter"; }
void write(const PartitionInfo & info, DB::Block & block) override;
size_t adaptiveBlockSize() const
{
size_t res = max_merge_block_size;
if (max_merge_block_bytes)
{
res = std::min(std::max(max_merge_block_bytes / (current_accumulated_bytes / current_accumulated_rows), 128UL), res);
}
return res;
}
protected:
size_t max_merge_block_size = DB::DEFAULT_BLOCK_SIZE;
size_t max_sort_buffer_size = 1_GiB;
size_t max_merge_block_bytes = 0;
size_t current_accumulated_bytes = 0;
size_t current_accumulated_rows = 0;
DB::Chunks accumulated_blocks;
DB::Block output_header;
DB::Block sort_header;
DB::SortDescription sort_description;
};
class MemorySortLocalPartitionWriter : public SortBasedPartitionWriter, public Spillable
{
public:
explicit MemorySortLocalPartitionWriter(const SplitOptions& options)
: SortBasedPartitionWriter(options, getLogger("MemorySortLocalPartitionWriter")), Spillable(options)
{
}
~MemorySortLocalPartitionWriter() override = default;
String getName() const override { return "MemorySortLocalPartitionWriter"; }
size_t evictPartitions() override;
bool useRSSPusher() const override { return false; }
};
class MemorySortCelebornPartitionWriter : public SortBasedPartitionWriter
{
public:
explicit MemorySortCelebornPartitionWriter(const SplitOptions& options, std::unique_ptr<CelebornClient> celeborn_client_)
: SortBasedPartitionWriter(options, getLogger("MemorySortCelebornPartitionWriter")), celeborn_client(std::move(celeborn_client_))
{
}
String getName() const override { return "MemorySortCelebornPartitionWriter"; }
~MemorySortCelebornPartitionWriter() override = default;
bool useRSSPusher() const override { return true; }
size_t evictPartitions() override;
private:
std::unique_ptr<CelebornClient> celeborn_client;
};
class CelebornPartitionWriter : public PartitionWriter
{
public:
CelebornPartitionWriter(const SplitOptions& options, std::unique_ptr<CelebornClient> celeborn_client);
~CelebornPartitionWriter() override = default;
String getName() const override { return "CelebornPartitionWriter"; }
bool useRSSPusher() const override { return true; }
size_t evictPartitions() override;
protected:
bool supportsEvictSinglePartition() const override { return true; }
size_t evictSinglePartition(size_t partition_id) override;
private:
std::unique_ptr<CelebornClient> celeborn_client;
};
}