| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #pragma once |
| |
| #include <Core/Block.h> |
| #include <Core/SortDescription.h> |
| #include <DataTypes/DataTypeFactory.h> |
| #include <Interpreters/ActionsDAG.h> |
| #include <Interpreters/Aggregator.h> |
| #include <Parser/CHColumnToSparkRow.h> |
| #include <Parser/RelMetric.h> |
| #include <Processors/QueryPlan/ISourceStep.h> |
| #include <Processors/QueryPlan/QueryPlan.h> |
| #include <Storages/SourceFromJavaIter.h> |
| #include <base/types.h> |
| #include <substrait/plan.pb.h> |
| |
| namespace local_engine |
| { |
| |
| std::string join(const DB::ActionsDAG::NodeRawConstPtrs & v, char c); |
| |
| class SerializedPlanParser; |
| class LocalExecutor; |
| class ParserContext; |
| class ExpressionParser; |
| |
| // Give a condition expression `cond_rel_`, found all columns with nullability that must not containt |
| // null after this filter. |
| // It's used to remove nullability of the columns for performance reason. |
| class NonNullableColumnsResolver |
| { |
| public: |
| explicit NonNullableColumnsResolver( |
| const DB::Block & header_, std::shared_ptr<const ParserContext> parser_context_, const substrait::Expression & cond_rel_); |
| ~NonNullableColumnsResolver() = default; |
| |
| // return column names |
| std::set<String> resolve(); |
| |
| private: |
| DB::Block header; |
| std::shared_ptr<const ParserContext> parser_context; |
| const substrait::Expression & cond_rel; |
| std::unique_ptr<ExpressionParser> expression_parser; |
| |
| std::set<String> collected_columns; |
| |
| void visit(const substrait::Expression & expr); |
| void visitNonNullable(const substrait::Expression & expr); |
| }; |
| |
| class SerializedPlanParser |
| { |
| private: |
| std::unique_ptr<LocalExecutor> createExecutor(DB::QueryPlanPtr query_plan, const substrait::Plan & s_plan) const; |
| |
| public: |
| explicit SerializedPlanParser(std::shared_ptr<const ParserContext> parser_context_); |
| |
| /// visible for UT |
| DB::QueryPlanPtr parse(const substrait::Plan & plan); |
| std::unique_ptr<LocalExecutor> createExecutor(const substrait::Plan & plan); |
| DB::QueryPipelineBuilderPtr buildQueryPipeline(DB::QueryPlan & query_plan) const; |
| /// |
| std::unique_ptr<LocalExecutor> createExecutor(const std::string_view plan); |
| |
| void addInputIter(jobject iter, bool materialize_input) |
| { |
| input_iters.emplace_back(iter); |
| materialize_inputs.emplace_back(materialize_input); |
| } |
| |
| std::pair<jobject, bool> getInputIter(size_t index) |
| { |
| if (index > input_iters.size()) |
| throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Index({}) is overflow input_iters's size({})", index, input_iters.size()); |
| return {input_iters[index], materialize_inputs[index]}; |
| } |
| |
| void addSplitInfo(std::string && split_info) { split_infos.emplace_back(std::move(split_info)); } |
| |
| int nextSplitInfoIndex() |
| { |
| if (split_info_index >= split_infos.size()) |
| throw DB::Exception( |
| DB::ErrorCodes::LOGICAL_ERROR, |
| "split info index out of range, split_info_index: {}, split_infos.size(): {}", |
| split_info_index, |
| split_infos.size()); |
| return split_info_index++; |
| } |
| |
| const String & nextSplitInfo() |
| { |
| auto next_index = nextSplitInfoIndex(); |
| return split_infos.at(next_index); |
| } |
| |
| RelMetricPtr getMetric() { return metrics.empty() ? nullptr : metrics.at(0); } |
| |
| std::vector<DB::QueryPlanPtr> extra_plan_holder; |
| |
| private: |
| DB::QueryPlanPtr parseOp(const substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack); |
| static void adjustOutput(const DB::QueryPlanPtr & query_plan, const substrait::Plan & plan); |
| |
| std::vector<jobject> input_iters; |
| std::vector<std::string> split_infos; |
| int split_info_index = 0; |
| std::vector<bool> materialize_inputs; |
| DB::ContextPtr context; |
| std::shared_ptr<const ParserContext> parser_context; |
| std::vector<RelMetricPtr> metrics; |
| }; |
| |
| } |