blob: 59f25f17e3cc231d56f79ac5a98efe57e3f074c0 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/schema_scanner/schema_rowsets_scanner.h"
#include <gen_cpp/Descriptors_types.h>
#include <algorithm>
#include <cstddef>
#include <memory>
#include <shared_mutex>
#include <string>
#include <utility>
#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet.h"
#include "cloud/cloud_tablet_mgr.h"
#include "cloud/config.h"
#include "common/status.h"
#include "olap/olap_common.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "olap/tablet_manager.h"
#include "runtime/define_primitive_type.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "util/runtime_profile.h"
#include "vec/common/string_ref.h"
namespace doris {
namespace vectorized {
class Block;
} // namespace vectorized
#include "common/compile_check_begin.h"
std::vector<SchemaScanner::ColumnDesc> SchemaRowsetsScanner::_s_tbls_columns = {
// name, type, size, is_null
{"BACKEND_ID", TYPE_BIGINT, sizeof(int64_t), true},
{"ROWSET_ID", TYPE_VARCHAR, sizeof(StringRef), true},
{"TABLET_ID", TYPE_BIGINT, sizeof(int64_t), true},
{"ROWSET_NUM_ROWS", TYPE_BIGINT, sizeof(int64_t), true},
{"TXN_ID", TYPE_BIGINT, sizeof(int64_t), true},
{"NUM_SEGMENTS", TYPE_BIGINT, sizeof(int64_t), true},
{"START_VERSION", TYPE_BIGINT, sizeof(int64_t), true},
{"END_VERSION", TYPE_BIGINT, sizeof(int64_t), true},
{"INDEX_DISK_SIZE", TYPE_BIGINT, sizeof(size_t), true},
{"DATA_DISK_SIZE", TYPE_BIGINT, sizeof(size_t), true},
{"CREATION_TIME", TYPE_DATETIME, sizeof(int64_t), true},
{"NEWEST_WRITE_TIMESTAMP", TYPE_DATETIME, sizeof(int64_t), true},
{"SCHEMA_VERSION", TYPE_INT, sizeof(int32_t), true},
};
SchemaRowsetsScanner::SchemaRowsetsScanner()
: SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_ROWSETS),
backend_id_(0),
_rowsets_idx(0) {};
Status SchemaRowsetsScanner::start(RuntimeState* state) {
if (!_is_init) {
return Status::InternalError("used before initialized.");
}
backend_id_ = state->backend_id();
RETURN_IF_ERROR(_get_all_rowsets());
return Status::OK();
}
Status SchemaRowsetsScanner::_get_all_rowsets() {
if (config::is_cloud_mode()) {
// only query cloud tablets in lru cache instead of all tablets
std::vector<std::weak_ptr<CloudTablet>> tablets =
ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_mgr().get_weak_tablets();
for (const std::weak_ptr<CloudTablet>& tablet : tablets) {
if (!tablet.expired()) {
auto t = tablet.lock();
std::shared_lock rowset_ldlock(t->get_header_lock());
for (const auto& it : t->rowset_map()) {
rowsets_.emplace_back(it.second);
}
}
}
return Status::OK();
}
std::vector<TabletSharedPtr> tablets =
ExecEnv::GetInstance()->storage_engine().to_local().tablet_manager()->get_all_tablet();
for (const auto& tablet : tablets) {
// all rowset
std::vector<std::pair<Version, RowsetSharedPtr>> all_rowsets;
{
std::shared_lock rowset_ldlock(tablet->get_header_lock());
tablet->acquire_version_and_rowsets(&all_rowsets);
}
for (const auto& version_and_rowset : all_rowsets) {
RowsetSharedPtr rowset = version_and_rowset.second;
rowsets_.emplace_back(rowset);
}
}
return Status::OK();
}
Status SchemaRowsetsScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
if (!_is_init) {
return Status::InternalError("Used before initialized.");
}
if (nullptr == block || nullptr == eos) {
return Status::InternalError("input pointer is nullptr.");
}
if (_rowsets_idx >= rowsets_.size()) {
*eos = true;
return Status::OK();
}
*eos = false;
return _fill_block_impl(block);
}
Status SchemaRowsetsScanner::_fill_block_impl(vectorized::Block* block) {
SCOPED_TIMER(_fill_block_timer);
size_t fill_rowsets_num = std::min(1000UL, rowsets_.size() - _rowsets_idx);
size_t fill_idx_begin = _rowsets_idx;
size_t fill_idx_end = _rowsets_idx + fill_rowsets_num;
std::vector<void*> datas(fill_rowsets_num);
// BACKEND_ID
{
int64_t src = backend_id_;
for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
datas[i - fill_idx_begin] = &src;
}
RETURN_IF_ERROR(fill_dest_column_for_range(block, 0, datas));
}
// ROWSET_ID
{
std::vector<std::string> rowset_ids(fill_rowsets_num);
std::vector<StringRef> strs(fill_rowsets_num);
for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
RowsetSharedPtr rowset = rowsets_[i];
rowset_ids[i - fill_idx_begin] = rowset->rowset_id().to_string();
strs[i - fill_idx_begin] = StringRef(rowset_ids[i - fill_idx_begin].c_str(),
rowset_ids[i - fill_idx_begin].size());
datas[i - fill_idx_begin] = strs.data() + i - fill_idx_begin;
}
RETURN_IF_ERROR(fill_dest_column_for_range(block, 1, datas));
}
// TABLET_ID
{
std::vector<int64_t> srcs(fill_rowsets_num);
for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
RowsetSharedPtr rowset = rowsets_[i];
srcs[i - fill_idx_begin] = rowset->rowset_meta()->tablet_id();
datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
}
RETURN_IF_ERROR(fill_dest_column_for_range(block, 2, datas));
}
// ROWSET_NUM_ROWS
{
std::vector<int64_t> srcs(fill_rowsets_num);
for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
RowsetSharedPtr rowset = rowsets_[i];
srcs[i - fill_idx_begin] = rowset->num_rows();
datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
}
RETURN_IF_ERROR(fill_dest_column_for_range(block, 3, datas));
}
// TXN_ID
{
std::vector<int64_t> srcs(fill_rowsets_num);
for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
RowsetSharedPtr rowset = rowsets_[i];
srcs[i - fill_idx_begin] = rowset->txn_id();
datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
}
RETURN_IF_ERROR(fill_dest_column_for_range(block, 4, datas));
}
// NUM_SEGMENTS
{
std::vector<int64_t> srcs(fill_rowsets_num);
for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
RowsetSharedPtr rowset = rowsets_[i];
srcs[i - fill_idx_begin] = rowset->num_segments();
datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
}
RETURN_IF_ERROR(fill_dest_column_for_range(block, 5, datas));
}
// START_VERSION
{
std::vector<int64_t> srcs(fill_rowsets_num);
for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
RowsetSharedPtr rowset = rowsets_[i];
srcs[i - fill_idx_begin] = rowset->start_version();
datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
}
RETURN_IF_ERROR(fill_dest_column_for_range(block, 6, datas));
}
// END_VERSION
{
std::vector<int64_t> srcs(fill_rowsets_num);
for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
RowsetSharedPtr rowset = rowsets_[i];
srcs[i - fill_idx_begin] = rowset->end_version();
datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
}
RETURN_IF_ERROR(fill_dest_column_for_range(block, 7, datas));
}
// INDEX_DISK_SIZE
{
std::vector<int64_t> srcs(fill_rowsets_num);
for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
RowsetSharedPtr rowset = rowsets_[i];
srcs[i - fill_idx_begin] = rowset->index_disk_size();
datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
}
RETURN_IF_ERROR(fill_dest_column_for_range(block, 8, datas));
}
// DATA_DISK_SIZE
{
std::vector<int64_t> srcs(fill_rowsets_num);
for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
RowsetSharedPtr rowset = rowsets_[i];
srcs[i - fill_idx_begin] = rowset->data_disk_size();
datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
}
RETURN_IF_ERROR(fill_dest_column_for_range(block, 9, datas));
}
// CREATION_TIME
{
std::vector<VecDateTimeValue> srcs(fill_rowsets_num);
for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
RowsetSharedPtr rowset = rowsets_[i];
int64_t creation_time = rowset->creation_time();
srcs[i - fill_idx_begin].from_unixtime(creation_time, _timezone_obj);
datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
}
RETURN_IF_ERROR(fill_dest_column_for_range(block, 10, datas));
}
// NEWEST_WRITE_TIMESTAMP
{
std::vector<VecDateTimeValue> srcs(fill_rowsets_num);
for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
RowsetSharedPtr rowset = rowsets_[i];
int64_t newest_write_timestamp = rowset->newest_write_timestamp();
srcs[i - fill_idx_begin].from_unixtime(newest_write_timestamp, _timezone_obj);
datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
}
RETURN_IF_ERROR(fill_dest_column_for_range(block, 11, datas));
}
// SCHEMA_VERSION
{
std::vector<int32_t> srcs(fill_rowsets_num);
for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
RowsetSharedPtr rowset = rowsets_[i];
srcs[i - fill_idx_begin] = rowset->tablet_schema()->schema_version();
datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
}
RETURN_IF_ERROR(fill_dest_column_for_range(block, 12, datas));
}
_rowsets_idx += fill_rowsets_num;
return Status::OK();
}
} // namespace doris