blob: aa88b42db1c6ab5d06d13058ccbebf9f156e4a34 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ArrayJoinHelper.h"
#include <Core/Settings.h>
#include <DataTypes/DataTypeArray.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/ArrayJoin.h>
#include <Interpreters/Context.h>
#include <Processors/QueryPlan/ArrayJoinStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Poco/Logger.h>
#include <Common/DebugUtils.h>
#include <Common/logger_useful.h>
namespace DB
{
namespace
{
extern const int LOGICAL_ERROR;
}
namespace Setting
{
extern const SettingsUInt64 max_block_size;
}
}
namespace local_engine
{
namespace ArrayJoinHelper
{
const DB::ActionsDAG::Node * findArrayJoinNode(const DB::ActionsDAG & actions_dag)
{
const DB::ActionsDAG::Node * array_join_node = nullptr;
const auto & nodes = actions_dag.getNodes();
for (const auto & node : nodes)
{
if (node.type == DB::ActionsDAG::ActionType::ARRAY_JOIN)
{
if (array_join_node)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Expect single ARRAY JOIN node in generate rel");
array_join_node = &node;
}
}
return array_join_node;
}
struct SplittedActionsDAGs
{
DB::ActionsDAG before_array_join; /// Optional
DB::ActionsDAG array_join;
DB::ActionsDAG after_array_join; /// Optional
};
/// Split actions_dag of generate rel into 3 parts: before array join + during array join + after array join
static SplittedActionsDAGs splitActionsDAGInGenerate(const DB::ActionsDAG & actions_dag)
{
SplittedActionsDAGs res;
auto array_join_node = findArrayJoinNode(actions_dag);
std::unordered_set<const DB::ActionsDAG::Node *> first_split_nodes(array_join_node->children.begin(), array_join_node->children.end());
auto first_split_result = actions_dag.split(first_split_nodes);
res.before_array_join = std::move(first_split_result.first);
array_join_node = findArrayJoinNode(first_split_result.second);
std::unordered_set<const DB::ActionsDAG::Node *> second_split_nodes = {array_join_node};
auto second_split_result = first_split_result.second.split(second_split_nodes);
res.array_join = std::move(second_split_result.first);
second_split_result.second.removeUnusedActions();
res.after_array_join = std::move(second_split_result.second);
return res;
}
DB::ActionsDAG applyArrayJoinOnOneColumn(const DB::Block & header, size_t column_index)
{
auto arrayColumn = header.getByPosition(column_index);
if (!typeid_cast<const DB::DataTypeArray *>(arrayColumn.type.get()))
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Expect array column in array join");
DB::ActionsDAG actions_dag(header.getColumnsWithTypeAndName());
const auto * array_column_node = actions_dag.getInputs()[column_index];
auto array_join_name = array_column_node->result_name;
const auto * array_join_node = &actions_dag.addArrayJoin(*array_column_node, array_join_name);
actions_dag.addOrReplaceInOutputs(*array_join_node);
return std::move(actions_dag);
}
std::vector<DB::IQueryPlanStep *>
addArrayJoinStep(DB::ContextPtr context, DB::QueryPlan & plan, const DB::ActionsDAG & actions_dag, bool is_left)
{
auto logger = getLogger("ArrayJoinHelper");
std::vector<DB::IQueryPlanStep *> steps;
if (findArrayJoinNode(actions_dag))
{
/// If generator in generate rel is explode/posexplode, transform arrayJoin function to ARRAY JOIN STEP to apply max_block_size
/// which avoids OOM when several lateral view explode/posexplode is used in spark sqls
LOG_TEST(logger, "original actions_dag:\n{}", debug::dumpActionsDAG(actions_dag));
auto splitted_actions_dags = splitActionsDAGInGenerate(actions_dag);
LOG_TEST(logger, "actions_dag before arrayJoin:\n{}", debug::dumpActionsDAG(splitted_actions_dags.before_array_join));
LOG_TEST(logger, "actions_dag during arrayJoin:\n{}", debug::dumpActionsDAG(splitted_actions_dags.array_join));
LOG_TEST(logger, "actions_dag after arrayJoin:\n{}", debug::dumpActionsDAG(splitted_actions_dags.after_array_join));
auto ignore_actions_dag = [](const DB::ActionsDAG & actions_dag_) -> bool
{
/*
We should ignore actions_dag like:
0 : INPUT () (no column) String a
1 : INPUT () (no column) String b
Output nodes: 0, 1
*/
return actions_dag_.getOutputs().size() == actions_dag_.getNodes().size()
&& actions_dag_.getInputs().size() == actions_dag_.getNodes().size();
};
/// Pre-projection before array join
if (!ignore_actions_dag(splitted_actions_dags.before_array_join))
{
auto step_before_array_join
= std::make_unique<DB::ExpressionStep>(plan.getCurrentHeader(), std::move(splitted_actions_dags.before_array_join));
step_before_array_join->setStepDescription("Pre-projection In Generate");
steps.emplace_back(step_before_array_join.get());
plan.addStep(std::move(step_before_array_join));
// LOG_DEBUG(logger, "plan1:{}", PlanUtil::explainPlan(*query_plan));
}
/// ARRAY JOIN
DB::Names array_joined_columns{findArrayJoinNode(splitted_actions_dags.array_join)->result_name};
DB::ArrayJoin array_join;
array_join.columns = std::move(array_joined_columns);
array_join.is_left = is_left;
auto array_join_step = std::make_unique<DB::ArrayJoinStep>(
plan.getCurrentHeader(), std::move(array_join), false, context->getSettingsRef()[DB::Setting::max_block_size]);
array_join_step->setStepDescription("ARRAY JOIN In Generate");
steps.emplace_back(array_join_step.get());
plan.addStep(std::move(array_join_step));
// LOG_DEBUG(logger, "plan2:{}", PlanUtil::explainPlan(*query_plan));
/// Post-projection after array join(Optional)
if (!ignore_actions_dag(splitted_actions_dags.after_array_join))
{
auto step_after_array_join
= std::make_unique<DB::ExpressionStep>(plan.getCurrentHeader(), std::move(splitted_actions_dags.after_array_join));
step_after_array_join->setStepDescription("Post-projection In Generate");
steps.emplace_back(step_after_array_join.get());
plan.addStep(std::move(step_after_array_join));
// LOG_DEBUG(logger, "plan3:{}", PlanUtil::explainPlan(*query_plan));
}
}
else
{
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Expect array join node in actions_dag");
}
return steps;
}
}
} // namespace local_engine