blob: ea1445c605e1b97c7cc5a96284dfe4176bdccaf2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <Parser/CHColumnToSparkRow.h>
#include <Parser/RelMetric.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Common/BlockIterator.h>
namespace local_engine
{
struct SparkBuffer
{
char * address;
size_t size;
};
class LocalExecutor : public BlockIterator
{
public:
static std::optional<LocalExecutor *> getCurrentExecutor()
{
if (current_executor)
return std::optional(current_executor);
return std::nullopt;
}
static void resetCurrentExecutor() { current_executor = nullptr; }
LocalExecutor(DB::QueryPlanPtr query_plan, DB::QueryPipelineBuilderPtr pipeline, bool dump_pipeline_ = false);
~LocalExecutor();
SparkRowInfoPtr next();
DB::Block * nextColumnar();
bool hasNext();
bool fallbackMode() const;
/// Stop execution, used when task receives shutdown command or executor receives SIGTERM signal
void cancel() const;
void setSinks(const std::function<void(DB::QueryPipelineBuilder &)> & setter) const;
void execute();
DB::Block getHeader();
RelMetricPtr getMetric() const { return metric; }
void setMetric(const RelMetricPtr & metric_) { metric = metric_; }
void setExtraPlanHolder(std::vector<DB::QueryPlanPtr> & extra_plan_holder_) { extra_plan_holder = std::move(extra_plan_holder_); }
private:
// In the case of fallback, there may be multiple native pipelines in one stage. Can determine whether a fallback has occurred by whether a LocalExecutor already exists.
// Updated when the LocalExecutor is created and reset when the task ends
static thread_local LocalExecutor * current_executor;
std::unique_ptr<SparkRowInfo> writeBlockToSparkRow(const DB::Block & block) const;
void initPullingPipelineExecutor();
/// Dump processor runtime information to log
std::string dumpPipeline() const;
DB::QueryPipelineBuilderPtr query_pipeline_builder;
DB::QueryPipeline query_pipeline;
// executor for fallback or ResultTask
std::unique_ptr<DB::PullingPipelineExecutor> executor = nullptr;
// executor for ShuffleMapTask
DB::PipelineExecutorPtr push_executor = nullptr;
DB::Block header;
bool dump_pipeline;
std::unique_ptr<CHColumnToSparkRow> ch_column_to_spark_row;
std::unique_ptr<SparkBuffer> spark_buffer;
DB::QueryPlanPtr current_query_plan;
RelMetricPtr metric;
std::vector<DB::QueryPlanPtr> extra_plan_holder;
bool fallback_mode = false;
};
}