// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "pipeline_task.h"

#include <fmt/format.h>
#include <gen_cpp/Metrics_types.h>
#include <glog/logging.h>
#include <stddef.h>

#include <ostream>

#include "pipeline/exec/operator.h"
#include "pipeline/pipeline.h"
#include "pipeline_fragment_context.h"
#include "runtime/descriptors.h"
#include "runtime/query_context.h"
#include "runtime/query_statistics.h"
#include "runtime/thread_context.h"
#include "task_queue.h"
#include "util/defer_op.h"
#include "util/runtime_profile.h"

namespace doris {
class RuntimeState;
} // namespace doris

namespace doris::pipeline {

PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state,
                           OperatorPtr& sink, PipelineFragmentContext* fragment_context,
                           RuntimeProfile* parent_profile)
        : _index(index),
          _pipeline(pipeline),
          _operators(pipeline->_operators),
          _source(_operators.front()),
          _root(_operators.back()),
          _sink(sink),
          _prepared(false),
          _opened(false),
          _state(state),
          _cur_state(PipelineTaskState::NOT_READY),
          _data_state(SourceState::DEPEND_ON_SOURCE),
          _fragment_context(fragment_context),
          _parent_profile(parent_profile) {
    _pipeline_task_watcher.start();
}

void PipelineTask::_fresh_profile_counter() {
    COUNTER_SET(_wait_source_timer, (int64_t)_wait_source_watcher.elapsed_time());
    COUNTER_SET(_wait_bf_timer, (int64_t)_wait_bf_watcher.elapsed_time());
    COUNTER_SET(_schedule_counts, (int64_t)_schedule_time);
    COUNTER_SET(_wait_sink_timer, (int64_t)_wait_sink_watcher.elapsed_time());
    COUNTER_SET(_wait_worker_timer, (int64_t)_wait_worker_watcher.elapsed_time());
    COUNTER_SET(_begin_execute_timer, _begin_execute_time);
    COUNTER_SET(_eos_timer, _eos_time);
    COUNTER_SET(_src_pending_finish_over_timer, _src_pending_finish_over_time);
    COUNTER_SET(_dst_pending_finish_over_timer, _dst_pending_finish_over_time);
    COUNTER_SET(_pip_task_total_timer, (int64_t)_pipeline_task_watcher.elapsed_time());
}

void PipelineTask::_init_profile() {
    std::stringstream ss;
    ss << "PipelineTask"
       << " (index=" << _index << ")";
    auto* task_profile = new RuntimeProfile(ss.str());
    _parent_profile->add_child(task_profile, true, nullptr);
    _task_profile.reset(task_profile);
    _task_cpu_timer = ADD_TIMER(_task_profile, "TaskCpuTime");

    static const char* exec_time = "ExecuteTime";
    _exec_timer = ADD_TIMER(_task_profile, exec_time);
    _prepare_timer = ADD_CHILD_TIMER(_task_profile, "PrepareTime", exec_time);
    _open_timer = ADD_CHILD_TIMER(_task_profile, "OpenTime", exec_time);
    _get_block_timer = ADD_CHILD_TIMER(_task_profile, "GetBlockTime", exec_time);
    _get_block_counter = ADD_COUNTER(_task_profile, "GetBlockCounter", TUnit::UNIT);
    _sink_timer = ADD_CHILD_TIMER(_task_profile, "SinkTime", exec_time);
    _finalize_timer = ADD_CHILD_TIMER(_task_profile, "FinalizeTime", exec_time);
    _close_timer = ADD_CHILD_TIMER(_task_profile, "CloseTime", exec_time);

    _wait_source_timer = ADD_TIMER(_task_profile, "WaitSourceTime");
    _wait_bf_timer = ADD_TIMER(_task_profile, "WaitBfTime");
    _wait_sink_timer = ADD_TIMER(_task_profile, "WaitSinkTime");
    _wait_worker_timer = ADD_TIMER(_task_profile, "WaitWorkerTime");
    _block_counts = ADD_COUNTER(_task_profile, "NumBlockedTimes", TUnit::UNIT);
    _block_by_source_counts = ADD_COUNTER(_task_profile, "NumBlockedBySrcTimes", TUnit::UNIT);
    _block_by_sink_counts = ADD_COUNTER(_task_profile, "NumBlockedBySinkTimes", TUnit::UNIT);
    _schedule_counts = ADD_COUNTER(_task_profile, "NumScheduleTimes", TUnit::UNIT);
    _yield_counts = ADD_COUNTER(_task_profile, "NumYieldTimes", TUnit::UNIT);
    _core_change_times = ADD_COUNTER(_task_profile, "CoreChangeTimes", TUnit::UNIT);

    _begin_execute_timer = ADD_TIMER(_task_profile, "Task1BeginExecuteTime");
    _eos_timer = ADD_TIMER(_task_profile, "Task2EosTime");
    _src_pending_finish_over_timer = ADD_TIMER(_task_profile, "Task3SrcPendingFinishOverTime");
    _dst_pending_finish_over_timer = ADD_TIMER(_task_profile, "Task4DstPendingFinishOverTime");
    _pip_task_total_timer = ADD_TIMER(_task_profile, "Task5TotalTime");
    _close_pipeline_timer = ADD_TIMER(_task_profile, "Task6ClosePipelineTime");
}

Status PipelineTask::prepare(RuntimeState* state) {
    DCHECK(_sink);
    DCHECK(_cur_state == PipelineTaskState::NOT_READY);
    _init_profile();
    SCOPED_TIMER(_task_profile->total_time_counter());
    SCOPED_CPU_TIMER(_task_cpu_timer);
    SCOPED_TIMER(_prepare_timer);
    RETURN_IF_ERROR(_sink->prepare(state));
    for (auto& o : _operators) {
        RETURN_IF_ERROR(o->prepare(state));
    }

    _task_profile->add_info_string("Sink",
                                   fmt::format("{}(dst_id={})", _sink->get_name(), _sink->id()));
    fmt::memory_buffer operator_ids_str;
    for (size_t i = 0; i < _operators.size(); i++) {
        if (i == 0) {
            fmt::format_to(
                    operator_ids_str,
                    fmt::format("[{}(node_id={})", _operators[i]->get_name(), _operators[i]->id()));
        } else {
            fmt::format_to(operator_ids_str,
                           fmt::format(", {}(node_id={})", _operators[i]->get_name(),
                                       _operators[i]->id()));
        }
    }
    fmt::format_to(operator_ids_str, "]");
    _task_profile->add_info_string("OperatorIds(source2root)", fmt::to_string(operator_ids_str));

    _block = doris::vectorized::Block::create_unique();

    // We should make sure initial state for task are runnable so that we can do some preparation jobs (e.g. initialize runtime filters).
    set_state(PipelineTaskState::RUNNABLE);
    _prepared = true;
    return Status::OK();
}

bool PipelineTask::has_dependency() {
    if (_dependency_finish) {
        return false;
    }
    if (_fragment_context->is_canceled()) {
        _dependency_finish = true;
        return false;
    }
    if (_pipeline->has_dependency()) {
        return true;
    }

    if (!query_context()->is_ready_to_execute()) {
        return true;
    }

    // runtime filter is a dependency
    _dependency_finish = true;
    return false;
}

Status PipelineTask::_open() {
    SCOPED_TIMER(_task_profile->total_time_counter());
    SCOPED_CPU_TIMER(_task_cpu_timer);
    SCOPED_TIMER(_open_timer);
    for (auto& o : _operators) {
        RETURN_IF_ERROR(o->open(_state));
    }
    if (_sink) {
        RETURN_IF_ERROR(_sink->open(_state));
    }
    _opened = true;
    return Status::OK();
}

void PipelineTask::set_task_queue(TaskQueue* task_queue) {
    _task_queue = task_queue;
}

Status PipelineTask::execute(bool* eos) {
    SCOPED_TIMER(_task_profile->total_time_counter());
    SCOPED_TIMER(_exec_timer);
    SCOPED_ATTACH_TASK(_state);
    int64_t time_spent = 0;

    ThreadCpuStopWatch cpu_time_stop_watch;
    cpu_time_stop_watch.start();

    Defer defer {[&]() {
        if (_task_queue) {
            _task_queue->update_statistics(this, time_spent);
        }

        int64_t delta_cpu_time = cpu_time_stop_watch.elapsed_time();
        _task_cpu_timer->update(delta_cpu_time);
        auto cpu_qs = query_context()->get_cpu_statistics();
        if (cpu_qs) {
            cpu_qs->add_cpu_nanos(delta_cpu_time);
        }
    }};
    // The status must be runnable
    *eos = false;
    if (!_opened) {
        {
            SCOPED_RAW_TIMER(&time_spent);
            // if _open_status is not ok, could know have execute open function,
            // now execute open again, so need excluding PIP_WAIT_FOR_RF and PIP_WAIT_FOR_SC error out.
            if (!_open_status.ok() && !_open_status.is<ErrorCode::PIP_WAIT_FOR_RF>() &&
                !_open_status.is<ErrorCode::PIP_WAIT_FOR_SC>()) {
                return _open_status;
            }
            // here execute open and not check dependency(eg: the second start rpc arrival)
            // so if open have some error, and return error status directly, the query will be cancel.
            // and then the rpc arrival will not found the query as have been canceled and remove.
            _open_status = _open();
            if (_open_status.is<ErrorCode::PIP_WAIT_FOR_RF>()) {
                set_state(PipelineTaskState::BLOCKED_FOR_RF);
                return Status::OK();
            } else if (_open_status.is<ErrorCode::PIP_WAIT_FOR_SC>()) {
                set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
                return Status::OK();
            }
            //if status is not ok, and have dependency to push back to queue again.
            if (!_open_status.ok() && has_dependency()) {
                set_state(PipelineTaskState::BLOCKED_FOR_DEPENDENCY);
                return Status::OK();
            }
            // if not ok and no dependency, return error to cancel.
            RETURN_IF_ERROR(_open_status);
        }
        if (has_dependency()) {
            set_state(PipelineTaskState::BLOCKED_FOR_DEPENDENCY);
            return Status::OK();
        }
        if (!source_can_read()) {
            set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
            return Status::OK();
        }
        if (!sink_can_write()) {
            set_state(PipelineTaskState::BLOCKED_FOR_SINK);
            return Status::OK();
        }
    }

    this->set_begin_execute_time();
    while (!_fragment_context->is_canceled()) {
        if (_data_state != SourceState::MORE_DATA && !source_can_read()) {
            set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
            break;
        }
        if (!sink_can_write()) {
            set_state(PipelineTaskState::BLOCKED_FOR_SINK);
            break;
        }
        if (time_spent > THREAD_TIME_SLICE) {
            COUNTER_UPDATE(_yield_counts, 1);
            break;
        }
        SCOPED_RAW_TIMER(&time_spent);
        _block->clear_column_data(_root->row_desc().num_materialized_slots());
        auto* block = _block.get();

        // Pull block from operator chain
        {
            SCOPED_TIMER(_get_block_timer);
            _get_block_counter->update(1);
            RETURN_IF_ERROR(_root->get_block(_state, block, _data_state));
        }
        *eos = _data_state == SourceState::FINISHED;
        if (_block->rows() != 0 || *eos) {
            SCOPED_TIMER(_sink_timer);
            auto status = _sink->sink(_state, block, _data_state);
            if (!status.is<ErrorCode::END_OF_FILE>()) {
                RETURN_IF_ERROR(status);
            }
            *eos = status.is<ErrorCode::END_OF_FILE>() ? true : *eos;
            if (*eos) { // just return, the scheduler will do finish work
                break;
            }
        }
    }
    if (*eos) { // now only join node/set operation node have add_dependency, and join probe could start when the join sink is eos
        _finish_p_dependency();
    }

    return Status::OK();
}

Status PipelineTask::finalize() {
    SCOPED_TIMER(_task_profile->total_time_counter());
    SCOPED_CPU_TIMER(_task_cpu_timer);
    Defer defer {[&]() {
        if (_task_queue) {
            _task_queue->update_statistics(this, _finalize_timer->value());
        }
    }};
    SCOPED_TIMER(_finalize_timer);
    return _sink->finalize(_state);
}

Status PipelineTask::try_close() {
    if (_try_close_flag) {
        return Status::OK();
    }
    _try_close_flag = true;
    Status status1 = _sink->try_close(_state);
    Status status2 = _source->try_close(_state);
    return status1.ok() ? status2 : status1;
}

Status PipelineTask::close() {
    int64_t close_ns = 0;
    Defer defer {[&]() {
        if (_task_queue) {
            _task_queue->update_statistics(this, close_ns);
        }
    }};
    Status s;
    {
        SCOPED_RAW_TIMER(&close_ns);
        s = _sink->close(_state);
        for (auto& op : _operators) {
            auto tem = op->close(_state);
            if (!tem.ok() && s.ok()) {
                s = tem;
            }
        }
    }
    if (_opened) {
        _fresh_profile_counter();
        COUNTER_SET(_close_timer, close_ns);
        COUNTER_UPDATE(_task_profile->total_time_counter(), close_ns);
    }
    return s;
}

QueryContext* PipelineTask::query_context() {
    return _fragment_context->get_query_context();
}

// The FSM see PipelineTaskState's comment
void PipelineTask::set_state(PipelineTaskState state) {
    DCHECK(_cur_state != PipelineTaskState::FINISHED);

    if (_cur_state == state) {
        return;
    }
    if (_cur_state == PipelineTaskState::BLOCKED_FOR_SOURCE) {
        if (state == PipelineTaskState::RUNNABLE) {
            _wait_source_watcher.stop();
        }
    } else if (_cur_state == PipelineTaskState::BLOCKED_FOR_SINK) {
        if (state == PipelineTaskState::RUNNABLE) {
            _wait_sink_watcher.stop();
        }
    } else if (_cur_state == PipelineTaskState::BLOCKED_FOR_RF) {
        if (state == PipelineTaskState::RUNNABLE) {
            _wait_bf_watcher.stop();
        }
    } else if (_cur_state == PipelineTaskState::RUNNABLE) {
        COUNTER_UPDATE(_block_counts, 1);
        if (state == PipelineTaskState::BLOCKED_FOR_SOURCE) {
            _wait_source_watcher.start();
            COUNTER_UPDATE(_block_by_source_counts, 1);
        } else if (state == PipelineTaskState::BLOCKED_FOR_SINK) {
            _wait_sink_watcher.start();
            COUNTER_UPDATE(_block_by_sink_counts, 1);
        } else if (state == PipelineTaskState::BLOCKED_FOR_RF) {
            _wait_bf_watcher.start();
        }
    }

    _cur_state = state;
}

std::string PipelineTask::debug_string() {
    fmt::memory_buffer debug_string_buffer;

    fmt::format_to(debug_string_buffer, "QueryId: {}\n", print_id(query_context()->query_id));
    fmt::format_to(debug_string_buffer, "InstanceId: {}\n",
                   print_id(fragment_context()->get_fragment_instance_id()));

    fmt::format_to(debug_string_buffer, "RuntimeUsage: {}\n",
                   PrettyPrinter::print(get_runtime_ns(), TUnit::TIME_NS));
    {
        std::stringstream profile_ss;
        _fresh_profile_counter();
        _task_profile->pretty_print(&profile_ss, "");
        fmt::format_to(debug_string_buffer, "Profile: {}\n", profile_ss.str());
    }
    fmt::format_to(debug_string_buffer,
                   "PipelineTask[this = {}, state = {}]\noperators: ", (void*)this,
                   get_state_name(_cur_state));
    for (size_t i = 0; i < _operators.size(); i++) {
        fmt::format_to(debug_string_buffer, "\n{}{}", std::string(i * 2, ' '),
                       _operators[i]->debug_string());
        std::stringstream profile_ss;
        _operators[i]->get_runtime_profile()->pretty_print(&profile_ss, std::string(i * 2, ' '));
        fmt::format_to(debug_string_buffer, "\n{}", profile_ss.str());
    }
    fmt::format_to(debug_string_buffer, "\n{}{}", std::string(_operators.size() * 2, ' '),
                   _sink->debug_string());
    {
        std::stringstream profile_ss;
        _sink->get_runtime_profile()->pretty_print(&profile_ss,
                                                   std::string(_operators.size() * 2, ' '));
        fmt::format_to(debug_string_buffer, "\n{}", profile_ss.str());
    }
    return fmt::to_string(debug_string_buffer);
}

taskgroup::TaskGroupPipelineTaskEntity* PipelineTask::get_task_group_entity() const {
    return _fragment_context->get_task_group_entity();
}

} // namespace doris::pipeline
