blob: 49f0c376c47cbe01f96868b5486a763c053d8364 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <stdint.h>
#include <memory>
#include "common/status.h"
#include "operator.h"
#include "pipeline/query_cache/query_cache.h"
namespace doris {
#include "common/compile_check_begin.h"
class RuntimeState;
namespace vectorized {
class Block;
} // namespace vectorized
namespace pipeline {
class DataQueue;
class CacheSourceOperatorX;
class CacheSourceLocalState final : public PipelineXLocalState<DataQueueSharedState> {
public:
ENABLE_FACTORY_CREATOR(CacheSourceLocalState);
using Base = PipelineXLocalState<DataQueueSharedState>;
using Parent = CacheSourceOperatorX;
CacheSourceLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent) {};
Status init(RuntimeState* state, LocalStateInfo& info) override;
Status open(RuntimeState* state) override;
[[nodiscard]] std::string debug_string(int indentation_level = 0) const override;
private:
friend class CacheSourceOperatorX;
friend class OperatorX<CacheSourceLocalState>;
QueryCache* _global_cache = QueryCache::instance();
std::string _cache_key {};
int64_t _version = 0;
std::vector<vectorized::BlockUPtr> _local_cache_blocks;
std::vector<int> _slot_orders;
size_t _current_query_cache_bytes = 0;
size_t _current_query_cache_rows = 0;
bool _need_insert_cache = true;
QueryCacheHandle _query_cache_handle;
std::vector<vectorized::BlockUPtr>* _hit_cache_results = nullptr;
std::vector<int> _hit_cache_column_orders;
int _hit_cache_pos = 0;
};
class CacheSourceOperatorX final : public OperatorX<CacheSourceLocalState> {
public:
using Base = OperatorX<CacheSourceLocalState>;
CacheSourceOperatorX(ObjectPool* pool, int plan_node_id, int operator_id,
const TQueryCacheParam& cache_param)
: Base(pool, plan_node_id, operator_id), _cache_param(cache_param) {
_op_name = "CACHE_SOURCE_OPERATOR";
};
#ifdef BE_TEST
CacheSourceOperatorX() = default;
#endif
~CacheSourceOperatorX() override = default;
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override;
bool is_source() const override { return true; }
const RowDescriptor& intermediate_row_desc() const override {
return _child->intermediate_row_desc();
}
RowDescriptor& row_descriptor() override { return _child->row_descriptor(); }
const RowDescriptor& row_desc() const override { return _child->row_desc(); }
private:
TQueryCacheParam _cache_param;
bool _has_data(RuntimeState* state) const {
auto& local_state = get_local_state(state);
return local_state._shared_state->data_queue.remaining_has_data();
}
friend class CacheSourceLocalState;
};
} // namespace pipeline
#include "common/compile_check_end.h"
} // namespace doris