blob: cf9ac43a88621e1a031ad2dfb27952666a6e042c [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <jni.h>
#include <Processors/ISink.h>
#include <Shuffle/PartitionWriter.h>
#include <Shuffle/SelectorBuilder.h>
#include <Shuffle/ShuffleCommon.h>
namespace DB
{
class QueryPipelineBuilder;
}
namespace local_engine
{
class CelebornClient;
class PartitionWriter;
class SparkExchangeSink : public DB::ISink
{
friend class SparkExchangeManager;
public:
SparkExchangeSink(const DB::SharedHeader& header, std::unique_ptr<SelectorBuilder> partitioner_,
std::shared_ptr<PartitionWriter> partition_writer_,
const std::vector<size_t>& output_columns_indicies_, bool sort_writer_)
: DB::ISink(header)
, partitioner(std::move(partitioner_))
, partition_writer(partition_writer_)
, output_columns_indicies(output_columns_indicies_)
, sort_writer(sort_writer_)
{
initOutputHeader(*header);
partition_writer->initialize(&split_result, output_header);
}
String getName() const override
{
return "SparkExchangeSink";
}
const SplitResult& getSplitResult() const
{
return split_result;
}
const DB::Block& getOutputHeader() const
{
return output_header;
}
protected:
void consume(DB::Chunk block) override;
void onFinish() override;
private:
void initOutputHeader(const DB::Block & block);
DB::Block output_header;
std::unique_ptr<SelectorBuilder> partitioner;
std::shared_ptr<PartitionWriter> partition_writer;
std::vector<size_t> output_columns_indicies;
bool sort_writer = false;
SplitResult split_result;
};
using SelectBuilderPtr = std::unique_ptr<SelectorBuilder>;
using SelectBuilderCreator = std::function<SelectBuilderPtr(const SplitOptions &)>;
class SparkExchangeManager
{
public:
SparkExchangeManager(const DB::Block& header, const String & short_name, const SplitOptions & options_, jobject rss_pusher = nullptr);
void initSinks(size_t num);
void setSinksToPipeline(DB::QueryPipelineBuilder & pipeline) const;
void pushBlock(const DB::Block &block);
void finish();
[[nodiscard]] SplitResult getSplitResult() const
{
return split_result;
}
private:
static SelectBuilderPtr createRoundRobinSelectorBuilder(const SplitOptions & options_);
static SelectBuilderPtr createHashSelectorBuilder(const SplitOptions & options_);
static SelectBuilderPtr createSingleSelectorBuilder(const SplitOptions & options_);
static SelectBuilderPtr createRangeSelectorBuilder(const SplitOptions & options_);
static std::unordered_map<String, SelectBuilderCreator> partitioner_creators;
void mergeSplitResult();
std::vector<SpillInfo> gatherAllSpillInfo() const;
std::vector<UInt64> mergeSpills(DB::WriteBuffer & data_file, const std::vector<SpillInfo>& spill_infos, const std::vector<Spillable::ExtraData> & extra_datas = {});
DB::Block input_header;
std::vector<std::shared_ptr<SparkExchangeSink>> sinks;
std::vector<std::shared_ptr<PartitionWriter>> partition_writers;
std::unique_ptr<CelebornClient> celeborn_client = nullptr;
SplitOptions options;
SelectBuilderCreator partitioner_creator;
std::vector<size_t> output_columns_indicies;
bool use_sort_shuffle = false;
bool use_rss = false;
SplitResult split_result;
};
}