blob: e8892e64f67f087c2d0581aca6a658a585909d93 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <memory>
#include <optional>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <Parser/RelParsers/RelParser.h>
#include <Storages/MergeTree/MergeTreeVirtualColumns.h>
#include <Storages/MergeTree/SparkMergeTreeMeta.h>
#include <Storages/Parquet/ParquetMeta.h>
#include <Storages/SubstraitSource/Delta/DeltaMeta.h>
#include <Storages/SubstraitSource/FormatFile.h>
#include <substrait/algebra.pb.h>
#include <Common/GlutenConfig.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
}
namespace local_engine
{
using ReplaceDeltaNodeFunc = std::function<void(DB::ActionsDAG &, const MergeTreeTableInstance &, DB::ContextPtr)>;
void replaceInputFileNameNode(DB::ActionsDAG & actions_dag, const MergeTreeTableInstance & merge_tree_table, DB::ContextPtr context);
void replaceFilePathNode(DB::ActionsDAG & actions_dag, const MergeTreeTableInstance & merge_tree_table, DB::ContextPtr context);
void replaceFileNameNode(DB::ActionsDAG & actions_dag, const MergeTreeTableInstance & merge_tree_table, DB::ContextPtr context);
void replaceFileSizeNode(DB::ActionsDAG & actions_dag, const MergeTreeTableInstance & merge_tree_table, DB::ContextPtr context);
void replaceFileBlockStartNode(DB::ActionsDAG & actions_dag, const MergeTreeTableInstance & merge_tree_table, DB::ContextPtr context);
void replaceFileBlockLengthNode(DB::ActionsDAG & actions_dag, const MergeTreeTableInstance & merge_tree_table, DB::ContextPtr context);
void replaceFileModificationTimeNode(DB::ActionsDAG & actions_dag, const MergeTreeTableInstance & merge_tree_table, DB::ContextPtr context);
void replaceDeltaInternalRowDeletedNode(DB::ActionsDAG & actions_dag, const MergeTreeTableInstance & merge_tree_table, DB::ContextPtr context);
void replaceInputFileBlockStartNode(DB::ActionsDAG & actions_dag, const MergeTreeTableInstance & merge_tree_table, DB::ContextPtr context);
void replaceInputFileBlockLengthNode(DB::ActionsDAG & actions_dag, const MergeTreeTableInstance & merge_tree_table, DB::ContextPtr context);
void replaceTmpRowIndexNode(DB::ActionsDAG & actions_dag, const MergeTreeTableInstance & merge_tree_table, DB::ContextPtr context);
const std::unordered_map<String, std::tuple<std::optional<String>, DB::DataTypePtr, ReplaceDeltaNodeFunc>> & getDeltaMetaColumnMap();
class MergeTreeRelParser : public RelParser
{
public:
inline static const std::string VIRTUAL_COLUMN_PART = "_part";
explicit MergeTreeRelParser(ParserContextPtr parser_context_, const DB::ContextPtr & context_)
: RelParser(parser_context_), context(context_)
{
spark_sql_config = SparkSQLConfig::loadFromContext(context);
}
~MergeTreeRelParser() override = default;
DB::QueryPlanPtr parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack_) override
{
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "MergeTreeRelParser can't call parse(), call parseReadRel instead.");
}
DB::QueryPlanPtr parseReadRel(
DB::QueryPlanPtr query_plan, const substrait::ReadRel & read_rel, const substrait::ReadRel::ExtensionTable & extension_table);
std::optional<const substrait::Rel *> getSingleInput(const substrait::Rel &) override
{
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "MergeTreeRelParser can't call getSingleInput().");
}
String filterRangesOnDriver(const substrait::ReadRel & read_rel);
struct Condition
{
explicit Condition(const substrait::Expression & node_) : node(node_) { }
const substrait::Expression node;
size_t columns_size = 0;
DB::NameSet table_columns;
Int64 min_position_in_primary_key = std::numeric_limits<Int64>::max() - 1;
auto tuple() const { return std::make_tuple(-min_position_in_primary_key, columns_size, table_columns.size()); }
bool operator<(const Condition & rhs) const { return tuple() < rhs.tuple(); }
};
using Conditions = std::list<Condition>;
// visable for test
void analyzeExpressions(Conditions & res, const substrait::Expression & rel, std::set<Int64> & pk_positions, const DB::Block & block);
public:
std::unordered_map<std::string, UInt64> column_sizes;
private:
DB::Block parseMergeTreeOutput(const substrait::ReadRel & rel, SparkStorageMergeTreePtr storage);
DB::Block replaceDeltaNameIfNeeded(const DB::Block & output);
void replaceNodeWithCaseSensitive(DB::Block & read_block, SparkStorageMergeTreePtr storage);
void recoverDeltaNameIfNeeded(DB::QueryPlan & plan, const DB::Block & output, const MergeTreeTableInstance & merge_tree_table);
void recoverNodeWithCaseSensitive(DB::QueryPlan & query_plan, const DB::Block & output);
void parseToAction(DB::ActionsDAG & filter_action, const substrait::Expression & rel, std::string & filter_name) const;
DB::PrewhereInfoPtr parsePreWhereInfo(const substrait::Expression & rel, const DB::Block & input);
DB::ActionsDAG optimizePrewhereAction(const substrait::Expression & rel, std::string & filter_name, const DB::Block & block);
String getCHFunctionName(const substrait::Expression_ScalarFunction & substrait_func) const;
static void collectColumns(const substrait::Expression & rel, DB::NameSet & columns, const DB::Block & block);
UInt64 getColumnsSize(const DB::NameSet & columns);
DB::ContextPtr context;
SparkSQLConfig spark_sql_config;
};
}