blob: f68730d98e0680e2fc31048758227665c3b66674 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <cstddef>
#include <Core/Block.h>
#include <Interpreters/Context_fwd.h>
#include <Processors/Chunk.h>
#include <Processors/IProcessor.h>
#include <Processors/Port.h>
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Processors/QueryPlan/QueryPlan.h>
namespace local_engine
{
class BranchStepHelper
{
public:
// Create a new query plan that would be used to build sub branch query plan.
static DB::QueryPlanPtr createSubPlan(const DB::SharedHeader & header, size_t num_streams);
};
// Use to branch the query plan.
class StaticBranchStep : public DB::ITransformingStep
{
public:
using BranchSelector = std::function<size_t(const std::list<DB::Chunk> &)>;
explicit StaticBranchStep(
const DB::ContextPtr & context_, const DB::SharedHeader & header, size_t branches, size_t sample_rows, BranchSelector selector);
~StaticBranchStep() override = default;
String getName() const override { return "StaticBranchStep"; }
// This will resize the num_streams to 1. You may need to resize after this.
void transformPipeline(DB::QueryPipelineBuilder & pipeline, const DB::BuildQueryPipelineSettings & settings) override;
void describePipeline(DB::IQueryPlanStep::FormatSettings & settings) const override;
protected:
void updateOutputHeader() override;
private:
DB::ContextPtr context;
DB::SharedHeader header;
size_t max_sample_rows;
size_t branches;
BranchSelector selector;
};
// It should be better to build execution branches on QueryPlan.
class UniteBranchesStep : public DB::ITransformingStep
{
public:
explicit UniteBranchesStep(
const DB::ContextPtr & context_, const DB::SharedHeader & header_, std::vector<DB::QueryPlanPtr> && branch_plans_, size_t num_streams_);
~UniteBranchesStep() override = default;
String getName() const override { return "UniteBranchesStep"; }
void transformPipeline(DB::QueryPipelineBuilder & pipelines, const DB::BuildQueryPipelineSettings &) override;
void describePipeline(DB::IQueryPlanStep::FormatSettings & settings) const override;
private:
DB::ContextPtr context;
DB::SharedHeader header;
std::vector<DB::QueryPlanPtr> branch_plans;
size_t num_streams;
void updateOutputHeader() override { output_header = header; };
};
}