| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| #include "runtime/cache/result_cache.h" |
| |
| #include <gen_cpp/internal_service.pb.h> |
| #include <glog/logging.h> |
| |
| #include <iostream> |
| #include <list> |
| #include <utility> |
| |
| #include "olap/olap_define.h" |
| #include "runtime/cache/cache_utils.h" |
| #include "util/doris_metrics.h" |
| |
| namespace doris { |
| |
| /** |
| * Remove the tail node of link |
| */ |
| ResultNode* ResultNodeList::pop() { |
| remove(_head); |
| return _head; |
| } |
| |
| void ResultNodeList::remove(ResultNode* node) { |
| if (!node) return; |
| if (node == _head) _head = node->get_next(); |
| if (node == _tail) _tail = node->get_prev(); |
| node->unlink(); |
| _node_count--; |
| } |
| |
| void ResultNodeList::push_back(ResultNode* node) { |
| if (!node) return; |
| if (!_head) _head = node; |
| node->append(_tail); |
| _tail = node; |
| _node_count++; |
| } |
| |
| void ResultNodeList::move_tail(ResultNode* node) { |
| if (!node || node == _tail) return; |
| if (!_head) |
| _head = node; |
| else if (node == _head) |
| _head = node->get_next(); |
| node->unlink(); |
| node->append(_tail); |
| _tail = node; |
| } |
| |
| void ResultNodeList::delete_node(ResultNode** node) { |
| (*node)->clear(); |
| SAFE_DELETE(*node); |
| } |
| |
| void ResultNodeList::clear() { |
| LOG(INFO) << "clear result node list."; |
| while (_head) { |
| ResultNode* tmp_node = _head->get_next(); |
| _head->clear(); |
| SAFE_DELETE(_head); |
| _head = tmp_node; |
| } |
| _node_count = 0; |
| } |
| /** |
| * Find the node and update partition data |
| * New node, the node updated in the first partition will move to the tail of the list |
| */ |
| void ResultCache::update(const PUpdateCacheRequest* request, PCacheResponse* response) { |
| ResultNode* node; |
| PCacheStatus status; |
| bool update_first = false; |
| UniqueId sql_key = request->sql_key(); |
| LOG(INFO) << "update cache, sql key:" << sql_key; |
| |
| CacheWriteLock write_lock(_cache_mtx); |
| auto it = _node_map.find(sql_key); |
| if (it != _node_map.end()) { |
| node = it->second; |
| _cache_size -= node->get_data_size(); |
| _partition_count -= node->get_partition_count(); |
| status = node->update_partition(request, update_first); |
| } else { |
| node = _node_list.new_node(sql_key); |
| status = node->update_partition(request, update_first); |
| _node_list.push_back(node); |
| _node_map[sql_key] = node; |
| _node_count += 1; |
| } |
| if (update_first) { |
| _node_list.move_tail(node); |
| } |
| _cache_size += node->get_data_size(); |
| _partition_count += node->get_partition_count(); |
| response->set_status(status); |
| |
| prune(); |
| update_monitor(); |
| } |
| |
| /** |
| * Fetch cache through sql key, partition key, version and time |
| */ |
| void ResultCache::fetch(const PFetchCacheRequest* request, PFetchCacheResult* result) { |
| bool hit_first = false; |
| ResultNodeMap::iterator node_it; |
| const UniqueId sql_key = request->sql_key(); |
| LOG(INFO) << "fetch cache, sql key:" << sql_key; |
| { |
| CacheReadLock read_lock(_cache_mtx); |
| node_it = _node_map.find(sql_key); |
| if (node_it == _node_map.end()) { |
| result->set_status(PCacheStatus::NO_SQL_KEY); |
| LOG(INFO) << "no such sql key:" << sql_key; |
| return; |
| } |
| ResultNode* node = node_it->second; |
| PartitionRowBatchList part_rowbatch_list; |
| PCacheStatus status = node->fetch_partition(request, part_rowbatch_list, hit_first); |
| |
| for (auto part_it = part_rowbatch_list.begin(); part_it != part_rowbatch_list.end(); |
| part_it++) { |
| PCacheValue* srcValue = (*part_it)->get_value(); |
| if (srcValue != nullptr) { |
| PCacheValue* value = result->add_values(); |
| value->CopyFrom(*srcValue); |
| LOG(INFO) << "fetch cache partition key:" << srcValue->param().partition_key(); |
| } else { |
| LOG(WARNING) << "prowbatch of cache is null"; |
| status = PCacheStatus::EMPTY_DATA; |
| break; |
| } |
| } |
| if (status == PCacheStatus::CACHE_OK && part_rowbatch_list.empty()) { |
| status = PCacheStatus::EMPTY_DATA; |
| } |
| result->set_status(status); |
| } |
| |
| if (hit_first) { |
| CacheWriteLock write_lock(_cache_mtx); |
| _node_list.move_tail(node_it->second); |
| } |
| } |
| |
| bool ResultCache::contains(const UniqueId& sql_key) { |
| CacheReadLock read_lock(_cache_mtx); |
| return _node_map.find(sql_key) != _node_map.end(); |
| } |
| |
| /** |
| * enum PClearType { |
| * CLEAR_ALL = 0, |
| * PRUNE_CACHE = 1, |
| * CLEAR_BEFORE_TIME = 2, |
| * CLEAR_SQL_KEY = 3 |
| * }; |
| */ |
| void ResultCache::clear(const PClearCacheRequest* request, PCacheResponse* response) { |
| LOG(INFO) << "clear cache type" << request->clear_type() |
| << ", node size:" << _node_list.get_node_count() << ", map size:" << _node_map.size(); |
| CacheWriteLock write_lock(_cache_mtx); |
| //0 clear, 1 prune, 2 before_time,3 sql_key |
| switch (request->clear_type()) { |
| case PClearType::CLEAR_ALL: |
| _node_list.clear(); |
| _node_map.clear(); |
| _cache_size = 0; |
| _node_count = 0; |
| _partition_count = 0; |
| break; |
| case PClearType::PRUNE_CACHE: |
| prune(); |
| break; |
| default: |
| break; |
| } |
| update_monitor(); |
| response->set_status(PCacheStatus::CACHE_OK); |
| } |
| |
| //private method |
| ResultNode* find_min_time_node(ResultNode* result_node) { |
| if (result_node->get_prev()) { |
| if (result_node->get_prev()->first_partition_last_time() <= |
| result_node->first_partition_last_time()) { |
| return result_node->get_prev(); |
| } |
| } |
| |
| if (result_node->get_next()) { |
| if (result_node->get_next()->first_partition_last_time() < |
| result_node->first_partition_last_time()) { |
| return result_node->get_next(); |
| } |
| } |
| return result_node; |
| } |
| |
| /* |
| * Two-dimensional array, prune the min last_read_time PartitionRowBatch. |
| * The following example is the last read time array. |
| * 1 and 2 is the read time, nodes with pruning read time < 3 |
| * Before: |
| * 1,2 //_head ResultNode* |
| * 1,2,3,4,5 |
| * 2,4,3,6,8 |
| * 5,7,9,11,13 //_tail ResultNode* |
| * After: |
| * 4,5 //_head |
| * 4,3,6,8 |
| * 5,7,9,11,13 //_tail |
| */ |
| void ResultCache::prune() { |
| if (_cache_size <= (_max_size + _elasticity_size)) { |
| return; |
| } |
| LOG(INFO) << "begin prune cache, cache_size : " << _cache_size << ", max_size : " << _max_size |
| << ", elasticity_size : " << _elasticity_size; |
| ResultNode* result_node = _node_list.get_head(); |
| while (_cache_size > _max_size) { |
| if (result_node == nullptr) { |
| break; |
| } |
| result_node = find_min_time_node(result_node); |
| _cache_size -= result_node->prune_first(); |
| if (result_node->get_data_size() == 0) { |
| ResultNode* next_node; |
| if (result_node->get_next()) { |
| next_node = result_node->get_next(); |
| } else if (result_node->get_prev()) { |
| next_node = result_node->get_prev(); |
| } else { |
| next_node = _node_list.get_head(); |
| } |
| remove(result_node); |
| result_node = next_node; |
| } |
| } |
| LOG(INFO) << "finish prune, cache_size : " << _cache_size; |
| _node_count = _node_map.size(); |
| _cache_size = 0; |
| _partition_count = 0; |
| for (auto node_it = _node_map.begin(); node_it != _node_map.end(); node_it++) { |
| _partition_count += node_it->second->get_partition_count(); |
| _cache_size += node_it->second->get_data_size(); |
| } |
| } |
| |
| void ResultCache::remove(ResultNode* result_node) { |
| auto node_it = _node_map.find(result_node->get_sql_key()); |
| if (node_it != _node_map.end()) { |
| _node_map.erase(node_it); |
| _node_list.remove(result_node); |
| _node_list.delete_node(&result_node); |
| } |
| } |
| |
| void ResultCache::update_monitor() { |
| DorisMetrics::instance()->query_cache_memory_total_byte->set_value(_cache_size); |
| DorisMetrics::instance()->query_cache_sql_total_count->set_value(_node_count); |
| DorisMetrics::instance()->query_cache_partition_total_count->set_value(_partition_count); |
| } |
| |
| } // namespace doris |