blob: 027f2fcb61d5a257c6e261df15bb616d5e0fd93d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstdint>
#include <string>
#include "common/factory_creator.h"
#include "common/status.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "util/runtime_profile.h"
namespace doris {
#include "common/compile_check_begin.h"
class MemTrackerLimiter;
class ResourceContext;
class MemoryContext : public std::enable_shared_from_this<MemoryContext> {
ENABLE_FACTORY_CREATOR(MemoryContext);
public:
/*
* 1. operate them thread-safe.
* 2. all tasks are unified.
* 3. should not be operated frequently, use local variables to update Counter.
*/
struct Stats {
RuntimeProfile::Counter* current_memory_bytes_counter_;
RuntimeProfile::Counter* peak_memory_bytes_counter_;
// Maximum memory peak for all backends.
// only set once by result sink when closing.
RuntimeProfile::Counter* max_peak_memory_bytes_counter_;
// The total number of times that the revoke method is called.
RuntimeProfile::Counter* revoke_attempts_counter_;
// The time that waiting for revoke finished.
RuntimeProfile::Counter* revoke_wait_time_ms_counter_;
// The revoked bytes
RuntimeProfile::Counter* revoked_bytes_counter_;
RuntimeProfile* profile() { return profile_.get(); }
void init_profile() {
profile_ = std::make_unique<RuntimeProfile>("MemoryContext");
current_memory_bytes_counter_ =
ADD_COUNTER(profile_, "CurrentMemoryBytes", TUnit::BYTES);
peak_memory_bytes_counter_ = ADD_COUNTER(profile_, "PeakMemoryBytes", TUnit::BYTES);
max_peak_memory_bytes_counter_ =
ADD_COUNTER(profile_, "MaxPeakMemoryBytes", TUnit::BYTES);
revoke_attempts_counter_ = ADD_COUNTER(profile_, "RevokeAttempts", TUnit::UNIT);
revoke_wait_time_ms_counter_ =
ADD_COUNTER(profile_, "RevokeWaitTimeMs", TUnit::TIME_MS);
revoked_bytes_counter_ = ADD_COUNTER(profile_, "RevokedBytes", TUnit::BYTES);
}
std::string debug_string() { return profile_->pretty_print(); }
private:
std::unique_ptr<RuntimeProfile> profile_;
};
MemoryContext() { stats_.init_profile(); }
virtual ~MemoryContext() = default;
RuntimeProfile* stats_profile() { return stats_.profile(); }
std::shared_ptr<MemTrackerLimiter> mem_tracker() const { return mem_tracker_; }
void set_mem_tracker(const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
mem_tracker_ = mem_tracker;
user_set_mem_limit_ = mem_tracker_->limit();
adjusted_mem_limit_ = mem_tracker_->limit();
}
// This method is called by workload group manager to set query's memlimit using slot
// If user set query limit explicitly, then should use less one
void set_mem_limit(int64_t new_mem_limit) const { mem_tracker_->set_limit(new_mem_limit); }
int64_t mem_limit() const { return mem_tracker_->limit(); }
int64_t user_set_mem_limit() const { return user_set_mem_limit_; }
// The new memlimit should be less than user set memlimit.
void set_adjusted_mem_limit(int64_t new_mem_limit) {
adjusted_mem_limit_ = std::min<int64_t>(new_mem_limit, user_set_mem_limit_);
}
// Expected mem limit is the limit when workload group reached limit.
int64_t adjusted_mem_limit() { return adjusted_mem_limit_; }
int64_t current_memory_bytes() const { return mem_tracker_->consumption(); }
int64_t peak_memory_bytes() const { return mem_tracker_->peak_consumption(); }
int64_t reserved_consumption() const { return mem_tracker_->reserved_consumption(); }
// TODO, use stats_.max_peak_memory_bytes_counter_->value();
int64_t max_peak_memory_bytes() const { return mem_tracker_->peak_consumption(); }
int64_t revoke_attempts() const { return stats_.revoke_attempts_counter_->value(); }
int64_t revoke_wait_time_ms() const { return stats_.revoke_wait_time_ms_counter_->value(); }
int64_t revoked_bytes() const { return stats_.revoked_bytes_counter_->value(); }
std::string debug_string();
protected:
friend class ResourceContext;
void set_resource_ctx(ResourceContext* resource_ctx) { resource_ctx_ = resource_ctx; }
Stats stats_;
// MemTracker that is shared by all fragment instances running on this host.
std::shared_ptr<MemTrackerLimiter> mem_tracker_ {nullptr};
ResourceContext* resource_ctx_ {nullptr};
int64_t user_set_mem_limit_ = 0;
std::atomic<int64_t> adjusted_mem_limit_ = 0;
};
#include "common/compile_check_end.h"
} // namespace doris