| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #pragma once |
| #include <cstddef> |
| #include <memory> |
| #include <vector> |
| #include <Core/Block.h> |
| #include <Core/Settings.h> |
| #include <Interpreters/Context.h> |
| #include <Interpreters/TemporaryDataOnDisk.h> |
| #include <Shuffle/ShuffleCommon.h> |
| #include <jni/CelebornClient.h> |
| #include <Common/GlutenConfig.h> |
| #include <Common/QueryContext.h> |
| |
| namespace DB |
| { |
| class MergingSortedAlgorithm; |
| namespace Setting |
| { |
| extern const SettingsUInt64 prefer_external_sort_block_bytes; |
| } |
| } |
| |
| namespace local_engine |
| { |
| |
| struct SpillInfo |
| { |
| std::string spilled_file; |
| std::map<size_t, std::pair<size_t, size_t>> partition_spill_infos; |
| }; |
| |
| class Partition |
| { |
| public: |
| Partition() = default; |
| ~Partition() = default; |
| |
| Partition(Partition && other) noexcept : blocks(std::move(other.blocks)) { } |
| |
| bool empty() const { return blocks.empty(); } |
| void addBlock(DB::Block block); |
| size_t spill(NativeWriter & writer); |
| size_t bytes() const { return cached_bytes; } |
| |
| private: |
| std::vector<DB::Block> blocks; |
| size_t cached_bytes = 0; |
| }; |
| |
| class CachedShuffleWriter; |
| using PartitionPtr = std::shared_ptr<Partition>; |
| class PartitionWriter : boost::noncopyable |
| { |
| friend class Spillable; |
| public: |
| PartitionWriter(const SplitOptions& options, LoggerPtr logger_); |
| virtual ~PartitionWriter() = default; |
| |
| void initialize(SplitResult * split_result_, const DB::Block & output_header_) |
| { |
| if (!init) |
| { |
| split_result = split_result_; |
| chassert(split_result != nullptr); |
| split_result->partition_lengths.resize(options.partition_num); |
| split_result->raw_partition_lengths.resize(options.partition_num); |
| output_header = output_header_; |
| init = true; |
| } |
| } |
| virtual String getName() const = 0; |
| |
| virtual void write(const PartitionInfo & info, DB::Block & block); |
| virtual bool useRSSPusher() const = 0; |
| virtual size_t evictPartitions() = 0; |
| |
| protected: |
| |
| size_t bytes() const; |
| |
| virtual bool worthToSpill(size_t cache_size) const; |
| |
| virtual bool supportsEvictSinglePartition() const { return false; } |
| |
| virtual size_t evictSinglePartition(size_t partition_id) |
| { |
| throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Evict single partition is not supported for {}", getName()); |
| } |
| |
| const SplitOptions & options; |
| MemoryConfig settings; |
| |
| std::vector<ColumnsBufferPtr> partition_block_buffer; |
| std::vector<PartitionPtr> partition_buffer; |
| |
| /// Only valid in celeborn partition writer |
| size_t last_partition_id; |
| SplitResult * split_result = nullptr; |
| DB::Block output_header; |
| LoggerPtr logger = nullptr; |
| bool init = false; |
| }; |
| |
| class Spillable |
| { |
| public: |
| struct ExtraData |
| { |
| std::vector<ColumnsBufferPtr> partition_block_buffer; |
| std::vector<PartitionPtr> partition_buffer; |
| }; |
| |
| Spillable(const SplitOptions& options_) : spill_options(options_) {} |
| virtual ~Spillable() = default; |
| const std::vector<SpillInfo> & getSpillInfos() const |
| { |
| return spill_infos; |
| } |
| |
| protected: |
| String getNextSpillFile(); |
| std::vector<SpillInfo> spill_infos; |
| const SplitOptions& spill_options; |
| }; |
| |
| class LocalPartitionWriter : public PartitionWriter, public Spillable |
| { |
| public: |
| explicit LocalPartitionWriter(const SplitOptions& options); |
| ~LocalPartitionWriter() override = default; |
| |
| String getName() const override { return "LocalPartitionWriter"; } |
| ExtraData getExtraData() |
| { |
| return {partition_block_buffer, partition_buffer}; |
| } |
| size_t evictPartitions() override; |
| bool useRSSPusher() const override { return false; } |
| }; |
| |
| class SortBasedPartitionWriter : public PartitionWriter |
| { |
| protected: |
| explicit SortBasedPartitionWriter(const SplitOptions& options, LoggerPtr logger) : PartitionWriter(options, logger) |
| { |
| max_merge_block_size = options.split_size; |
| max_sort_buffer_size = options.max_sort_buffer_size; |
| max_merge_block_bytes = QueryContext::globalContext()->getSettingsRef()[DB::Setting::prefer_external_sort_block_bytes]; |
| } |
| public: |
| String getName() const override { return "SortBasedPartitionWriter"; } |
| void write(const PartitionInfo & info, DB::Block & block) override; |
| size_t adaptiveBlockSize() const |
| { |
| size_t res = max_merge_block_size; |
| if (max_merge_block_bytes) |
| { |
| res = std::min(std::max(max_merge_block_bytes / (current_accumulated_bytes / current_accumulated_rows), 128UL), res); |
| } |
| return res; |
| } |
| |
| protected: |
| size_t max_merge_block_size = DB::DEFAULT_BLOCK_SIZE; |
| size_t max_sort_buffer_size = 1_GiB; |
| size_t max_merge_block_bytes = 0; |
| size_t current_accumulated_bytes = 0; |
| size_t current_accumulated_rows = 0; |
| DB::Chunks accumulated_blocks; |
| DB::Block output_header; |
| DB::Block sort_header; |
| DB::SortDescription sort_description; |
| }; |
| |
| class MemorySortLocalPartitionWriter : public SortBasedPartitionWriter, public Spillable |
| { |
| public: |
| explicit MemorySortLocalPartitionWriter(const SplitOptions& options) |
| : SortBasedPartitionWriter(options, getLogger("MemorySortLocalPartitionWriter")), Spillable(options) |
| { |
| } |
| |
| ~MemorySortLocalPartitionWriter() override = default; |
| String getName() const override { return "MemorySortLocalPartitionWriter"; } |
| |
| size_t evictPartitions() override; |
| bool useRSSPusher() const override { return false; } |
| }; |
| |
| class MemorySortCelebornPartitionWriter : public SortBasedPartitionWriter |
| { |
| public: |
| explicit MemorySortCelebornPartitionWriter(const SplitOptions& options, std::unique_ptr<CelebornClient> celeborn_client_) |
| : SortBasedPartitionWriter(options, getLogger("MemorySortCelebornPartitionWriter")), celeborn_client(std::move(celeborn_client_)) |
| { |
| } |
| |
| String getName() const override { return "MemorySortCelebornPartitionWriter"; } |
| ~MemorySortCelebornPartitionWriter() override = default; |
| |
| bool useRSSPusher() const override { return true; } |
| |
| size_t evictPartitions() override; |
| private: |
| std::unique_ptr<CelebornClient> celeborn_client; |
| }; |
| |
| class CelebornPartitionWriter : public PartitionWriter |
| { |
| public: |
| CelebornPartitionWriter(const SplitOptions& options, std::unique_ptr<CelebornClient> celeborn_client); |
| ~CelebornPartitionWriter() override = default; |
| |
| String getName() const override { return "CelebornPartitionWriter"; } |
| bool useRSSPusher() const override { return true; } |
| size_t evictPartitions() override; |
| |
| protected: |
| bool supportsEvictSinglePartition() const override { return true; } |
| size_t evictSinglePartition(size_t partition_id) override; |
| private: |
| std::unique_ptr<CelebornClient> celeborn_client; |
| }; |
| } |