blob: fb8a53925f631d7f573a12a1960a818677e55c9b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "pipeline_tracing.h"
#include <absl/time/clock.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <boost/algorithm/string/predicate.hpp>
#include <chrono>
#include <cstdint>
#include <mutex>
#include <string>
#include "common/config.h"
#include "common/exception.h"
#include "common/status.h"
#include "io/fs/local_file_writer.h"
#include "util/time.h"
namespace doris::pipeline {
void PipelineTracerContext::record(ScheduleRecord record) {
if (_dump_type == RecordType::None) [[unlikely]] {
return;
}
auto map_ptr = std::atomic_load_explicit(&_data, std::memory_order_relaxed);
auto it = map_ptr->find({record.query_id});
if (it != map_ptr->end()) {
it->second->enqueue(record);
} else {
_update([&](QueryTracesMap& new_map) {
if (!new_map.contains({record.query_id})) {
new_map[{record.query_id}].reset(new OneQueryTraces());
}
new_map[{record.query_id}]->enqueue(record);
});
}
}
void PipelineTracerContext::_update(std::function<void(QueryTracesMap&)>&& handler) {
auto map_ptr = std::atomic_load_explicit(&_data, std::memory_order_relaxed);
while (true) {
auto new_map = std::make_shared<QueryTracesMap>(*map_ptr);
handler(*new_map);
if (std::atomic_compare_exchange_strong_explicit(&_data, &map_ptr, new_map,
std::memory_order_relaxed,
std::memory_order_relaxed)) {
break;
}
}
}
void PipelineTracerContext::end_query(TUniqueId query_id, uint64_t workload_group) {
{
std::unique_lock<std::mutex> l(_tg_lock);
_id_to_workload_group[query_id] = workload_group;
}
if (_dump_type == RecordType::PerQuery) {
_dump_query(query_id);
} else if (_dump_type == RecordType::Periodic) {
auto now = MonotonicSeconds();
auto interval = now - _last_dump_time;
if (interval > _dump_interval_s) {
_dump_timeslice();
}
}
}
Status PipelineTracerContext::change_record_params(
const std::map<std::string, std::string>& params) {
bool effective = false;
if (auto it = params.find("type"); it != params.end()) {
if (boost::iequals(it->second, "disable") || boost::iequals(it->second, "none")) {
_dump_type = RecordType::None;
effective = true;
} else if (boost::iequals(it->second, "per_query") ||
boost::iequals(it->second, "perquery")) {
_dump_type = RecordType::PerQuery;
effective = true;
} else if (boost::iequals(it->second, "periodic")) {
_dump_type = RecordType::Periodic;
_last_dump_time = MonotonicSeconds();
effective = true;
}
}
if (auto it = params.find("dump_interval"); it != params.end()) {
_dump_interval_s = std::stoll(it->second); // s as unit
effective = true;
}
return effective ? Status::OK()
: Status::InvalidArgument(
"No qualified param in changing tracing record method");
}
void PipelineTracerContext::_dump_query(TUniqueId query_id) {
auto map_ptr = std::atomic_load_explicit(&_data, std::memory_order_relaxed);
auto path = _log_dir / fmt::format("query{}", to_string(query_id));
int fd = ::open(path.c_str(), O_CREAT | O_WRONLY | O_TRUNC,
S_ISGID | S_ISUID | S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
if (fd < 0) [[unlikely]] {
throw Exception(Status::Error<ErrorCode::CREATE_FILE_ERROR>(
"create tracing log file {} failed", path.c_str()));
}
auto writer = io::LocalFileWriter {path, fd};
ScheduleRecord record;
while ((*map_ptr)[QueryID {query_id}]->try_dequeue(record)) {
uint64_t v = 0;
{
std::unique_lock<std::mutex> l(_tg_lock);
v = _id_to_workload_group.at(query_id);
}
auto tmp_str = record.to_string(v);
auto text = Slice {tmp_str};
THROW_IF_ERROR(writer.appendv(&text, 1));
}
THROW_IF_ERROR(writer.close());
_last_dump_time = MonotonicSeconds();
_update([&](QueryTracesMap& new_map) { _data->erase(QueryID {query_id}); });
{
std::unique_lock<std::mutex> l(_tg_lock);
_id_to_workload_group.erase(query_id);
}
}
void PipelineTracerContext::_dump_timeslice() {
auto new_map = std::make_shared<QueryTracesMap>();
new_map.swap(_data);
//TODO: if long time, per timeslice per file
auto path = _log_dir /
fmt::format("until{}", std::chrono::steady_clock::now().time_since_epoch().count());
int fd = ::open(path.c_str(), O_CREAT | O_WRONLY | O_TRUNC,
S_ISGID | S_ISUID | S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
if (fd < 0) [[unlikely]] {
throw Exception(Status::Error<ErrorCode::CREATE_FILE_ERROR>(
"create tracing log file {} failed", path.c_str()));
}
auto writer = io::LocalFileWriter {path, fd};
// dump all query traces in this time window to one file.
for (auto& [query_id, trace] : (*new_map)) {
ScheduleRecord record;
while (trace->try_dequeue(record)) {
uint64_t v = 0;
{
std::unique_lock<std::mutex> l(_tg_lock);
v = _id_to_workload_group.at(query_id.query_id);
}
auto tmp_str = record.to_string(v);
auto text = Slice {tmp_str};
THROW_IF_ERROR(writer.appendv(&text, 1));
}
}
THROW_IF_ERROR(writer.close());
_last_dump_time = MonotonicSeconds();
_id_to_workload_group.clear();
}
} // namespace doris::pipeline