blob: 98a9b284ec6bb6799a8d31550798c7117c27fd09 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "RelMetric.h"
#include <Processors/IProcessor.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Storages/SubstraitSource/SubstraitFileSourceStep.h>
#include <Common/QueryContext.h>
using namespace rapidjson;
namespace ProfileEvents
{
extern const Event FileSegmentWaitReadBufferMicroseconds;
extern const Event FileSegmentReadMicroseconds;
extern const Event FileSegmentCacheWriteMicroseconds;
extern const Event FileSegmentPredownloadMicroseconds;
extern const Event FileSegmentUsedBytes;
extern const Event CachedReadBufferReadFromSourceMicroseconds;
extern const Event CachedReadBufferReadFromCacheMicroseconds;
extern const Event CachedReadBufferCacheWriteMicroseconds;
extern const Event CachedReadBufferReadFromSourceBytes;
extern const Event CachedReadBufferReadFromCacheBytes;
extern const Event CachedReadBufferCacheWriteBytes;
extern const Event CachedReadBufferCreateBufferMicroseconds;
extern const Event CachedReadBufferReadFromCacheHits;
extern const Event CachedReadBufferReadFromCacheMisses;
}
namespace local_engine
{
static void writeCacheHits(Writer<StringBuffer> & writer)
{
const auto thread_group = QueryContext::currentThreadGroup();
auto & counters = thread_group->performance_counters;
auto read_cache_hits = counters[ProfileEvents::CachedReadBufferReadFromCacheHits].load();
auto miss_cache_hits = counters[ProfileEvents::CachedReadBufferReadFromCacheMisses].load();
auto read_cache_bytes = counters[ProfileEvents::CachedReadBufferReadFromCacheBytes].load();
auto read_miss_bytes = counters[ProfileEvents::CachedReadBufferReadFromSourceBytes].load();
auto read_cache_millisecond = counters[ProfileEvents::CachedReadBufferReadFromCacheMicroseconds].load() / 1000;
auto miss_cache_millisecond = counters[ProfileEvents::CachedReadBufferReadFromSourceMicroseconds].load() / 1000;
writer.Key("read_cache_hits");
writer.Uint64(read_cache_hits);
writer.Key("miss_cache_hits");
writer.Uint64(miss_cache_hits);
writer.Key("read_cache_bytes");
writer.Uint64(read_cache_bytes);
writer.Key("read_miss_bytes");
writer.Uint64(read_miss_bytes);
writer.Key("read_cache_millisecond");
writer.Uint64(read_cache_millisecond);
writer.Key("miss_cache_millisecond");
writer.Uint64(miss_cache_millisecond);
}
RelMetric::RelMetric(size_t id_, const String & name_, std::vector<DB::IQueryPlanStep *> & steps_) : id(id_), name(name_), steps(steps_)
{
}
RelMetric::RelMetric(const String & name_, const std::vector<RelMetricPtr> & inputs_, std::vector<DB::IQueryPlanStep *> & steps_)
: name(name_), steps(steps_), inputs(inputs_)
{
auto rel = std::max_element(inputs.begin(), inputs.end(), [](RelMetricPtr a, RelMetricPtr b) { return a->id < b->id; });
id = rel->get()->id + 1;
}
size_t RelMetric::getId() const
{
return id;
}
const std::vector<DB::IQueryPlanStep *> & RelMetric::getSteps() const
{
return steps;
}
const std::vector<RelMetricPtr> & RelMetric::getInputs() const
{
return inputs;
}
RelMetricTimes RelMetric::getTotalTime() const
{
RelMetricTimes timeMetrics{0, 0, 0};
if (!steps.empty())
{
for (const auto * step : steps)
{
if (!step->getProcessors().empty())
{
for (const auto & processor : step->getProcessors())
{
timeMetrics.time += processor->getElapsedNs() / 1000U;
timeMetrics.input_wait_elapsed_us += processor->getInputWaitElapsedNs() / 1000U;
timeMetrics.output_wait_elapsed_us += processor->getInputWaitElapsedNs() / 1000U;
}
}
}
}
return timeMetrics;
}
void RelMetric::serialize(Writer<StringBuffer> & writer, bool) const
{
writer.StartObject();
writer.Key("id");
writer.Uint64(id);
writer.Key("name");
writer.String(name.c_str());
RelMetricTimes timeMetrics = getTotalTime();
writer.Key("time");
writer.Uint64(timeMetrics.time);
writer.Key("input_wait_time");
writer.Uint64(timeMetrics.input_wait_elapsed_us);
writer.Key("output_wait_time");
writer.Uint64(timeMetrics.output_wait_elapsed_us);
if (!steps.empty())
{
writer.Key("steps");
writer.StartArray();
for (const auto & step : steps)
{
writer.StartObject();
writer.Key("name");
writer.String(step->getName().c_str());
writer.Key("description");
writer.String(step->getStepDescription().c_str());
writer.Key("processors");
writer.StartArray();
for (const auto & processor : step->getProcessors())
{
writer.StartObject();
writer.Key("name");
writer.String(processor->getName().c_str());
writer.Key("time");
writer.Uint64(processor->getElapsedNs() / 1000U);
writer.Key("output_rows");
writer.Uint64(processor->getProcessorDataStats().output_rows);
writer.Key("output_bytes");
writer.Uint64(processor->getProcessorDataStats().output_bytes);
writer.Key("input_rows");
writer.Uint64(processor->getProcessorDataStats().input_rows);
writer.Key("input_bytes");
writer.Uint64(processor->getProcessorDataStats().input_bytes);
writer.EndObject();
}
writer.EndArray();
if (auto read_mergetree = dynamic_cast<DB::ReadFromMergeTree *>(step))
{
auto selected_marks_pk = read_mergetree->getAnalysisResult().selected_marks_pk;
auto selected_marks = read_mergetree->getAnalysisResult().selected_marks;
auto total_marks_pk = read_mergetree->getAnalysisResult().total_marks_pk;
writer.Key("selected_marks_pk");
writer.Uint64(selected_marks_pk);
writer.Key("selected_marks");
writer.Uint64(selected_marks);
writer.Key("total_marks_pk");
writer.Uint64(total_marks_pk);
writeCacheHits(writer);
}
else if (dynamic_cast<SubstraitFileSourceStep *>(step))
{
writeCacheHits(writer);
}
writer.EndObject();
}
writer.EndArray();
}
writer.EndObject();
}
const String & RelMetric::getName() const
{
return name;
}
std::string RelMetricSerializer::serializeRelMetric(const RelMetricPtr & rel_metric, bool flatten)
{
StringBuffer result;
Writer<StringBuffer> writer(result);
if (flatten)
{
writer.StartArray();
std::stack<RelMetricPtr> metrics;
metrics.push(rel_metric);
while (!metrics.empty())
{
auto metric = metrics.top();
metrics.pop();
for (const auto & item : metric->getInputs())
metrics.push(item);
metric->serialize(writer);
}
writer.EndArray();
}
return result.GetString();
}
}