blob: f5b05b9d3edf22ccd1e15f13a320e1ea9edc59b6 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "http/action/pipeline_task_action.h"
#include <sstream>
#include <string>
#include "http/http_channel.h"
#include "http/http_headers.h"
#include "http/http_request.h"
#include "http/http_status.h"
#include "pipeline/pipeline_fragment_context.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
namespace doris {
void PipelineTaskAction::handle(HttpRequest* req) {
req->add_output_header(HttpHeaders::CONTENT_TYPE, "text/plain; version=0.0.4");
HttpChannel::send_reply(req, HttpStatus::OK,
ExecEnv::GetInstance()->fragment_mgr()->dump_pipeline_tasks());
}
void LongPipelineTaskAction::handle(HttpRequest* req) {
req->add_output_header(HttpHeaders::CONTENT_TYPE, "text/plain; version=0.0.4");
int64_t duration = 0;
try {
duration = std::stoll(req->param("duration"));
} catch (const std::exception& e) {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "invalid argument.duration: {}, meet error: {}",
req->param("duration"), e.what());
LOG(WARNING) << fmt::to_string(debug_string_buffer);
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
fmt::to_string(debug_string_buffer));
return;
}
HttpChannel::send_reply(req, HttpStatus::OK,
ExecEnv::GetInstance()->fragment_mgr()->dump_pipeline_tasks(duration));
}
void QueryPipelineTaskAction::handle(HttpRequest* req) {
req->add_output_header(HttpHeaders::CONTENT_TYPE, "text/plain; version=0.0.4");
int64_t high = 0;
int64_t low = 0;
try {
auto& query_id_str = req->param("query_id");
if (query_id_str.length() != 16 * 2 + 1) {
HttpChannel::send_reply(
req, HttpStatus::INTERNAL_SERVER_ERROR,
"Invalid query id! Query id should be {hi}-{lo} which is a hexadecimal. \n");
return;
}
from_hex(&high, query_id_str.substr(0, 16));
from_hex(&low, query_id_str.substr(17));
} catch (const std::exception& e) {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "invalid argument.query_id: {}, meet error: {}. \n",
req->param("query_id"), e.what());
LOG(WARNING) << fmt::to_string(debug_string_buffer);
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
fmt::to_string(debug_string_buffer));
return;
}
TUniqueId query_id;
query_id.hi = high;
query_id.lo = low;
HttpChannel::send_reply(req, HttpStatus::OK,
ExecEnv::GetInstance()->fragment_mgr()->dump_pipeline_tasks(query_id));
}
} // end namespace doris