blob: 851882d13f9b838535ce29373567ba074fbda59a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/schema_scanner/schema_load_job_scanner.h"
#include <gen_cpp/Descriptors_types.h>
#include <gen_cpp/FrontendService_types.h>
#include <string>
#include "exec/schema_scanner/schema_helper.h"
#include "runtime/runtime_state.h"
#include "vec/common/string_ref.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type_factory.hpp"
namespace doris {
class RuntimeState;
namespace vectorized {
class Block;
} // namespace vectorized
std::vector<SchemaScanner::ColumnDesc> SchemaLoadJobScanner::_s_tbls_columns = {
// name, type, size, is_null
{"JOB_ID", TYPE_STRING, sizeof(StringRef), true},
{"LABEL", TYPE_STRING, sizeof(StringRef), true},
{"STATE", TYPE_STRING, sizeof(StringRef), true},
{"PROGRESS", TYPE_STRING, sizeof(StringRef), true},
{"TYPE", TYPE_STRING, sizeof(StringRef), true},
{"ETL_INFO", TYPE_STRING, sizeof(StringRef), true},
{"TASK_INFO", TYPE_STRING, sizeof(StringRef), true},
{"ERROR_MSG", TYPE_STRING, sizeof(StringRef), true},
{"CREATE_TIME", TYPE_STRING, sizeof(StringRef), true},
{"ETL_START_TIME", TYPE_STRING, sizeof(StringRef), true},
{"ETL_FINISH_TIME", TYPE_STRING, sizeof(StringRef), true},
{"LOAD_START_TIME", TYPE_STRING, sizeof(StringRef), true},
{"LOAD_FINISH_TIME", TYPE_STRING, sizeof(StringRef), true},
{"URL", TYPE_STRING, sizeof(StringRef), true},
{"JOB_DETAILS", TYPE_STRING, sizeof(StringRef), true},
{"TRANSACTION_ID", TYPE_STRING, sizeof(StringRef), true},
{"ERROR_TABLETS", TYPE_STRING, sizeof(StringRef), true},
{"USER", TYPE_STRING, sizeof(StringRef), true},
{"COMMENT", TYPE_STRING, sizeof(StringRef), true},
{"FIRST_ERROR_MSG", TYPE_STRING, sizeof(StringRef), true},
};
SchemaLoadJobScanner::SchemaLoadJobScanner()
: SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_LOAD_JOBS) {}
SchemaLoadJobScanner::~SchemaLoadJobScanner() {}
Status SchemaLoadJobScanner::start(RuntimeState* state) {
if (!_is_init) {
return Status::InternalError("used before initialized.");
}
TFetchLoadJobRequest request;
RETURN_IF_ERROR(SchemaHelper::fetch_load_job(*(_param->common_param->ip),
_param->common_param->port, request, &_result));
return Status::OK();
}
Status SchemaLoadJobScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
if (!_is_init) {
return Status::InternalError("call this before initial.");
}
if (block == nullptr || eos == nullptr) {
return Status::InternalError("invalid parameter.");
}
*eos = true;
if (_result.loadJobs.empty()) {
return Status::OK();
}
return _fill_block_impl(block);
}
Status SchemaLoadJobScanner::_fill_block_impl(vectorized::Block* block) {
SCOPED_TIMER(_fill_block_timer);
const auto& jobs_info = _result.loadJobs;
size_t row_num = jobs_info.size();
if (row_num == 0) {
return Status::OK();
}
for (size_t col_idx = 0; col_idx < _s_tbls_columns.size(); ++col_idx) {
const auto& col_desc = _s_tbls_columns[col_idx];
std::vector<StringRef> str_refs(row_num);
std::vector<void*> datas(row_num);
std::vector<std::string> column_values(row_num);
for (size_t row_idx = 0; row_idx < row_num; ++row_idx) {
const auto& job_info = jobs_info[row_idx];
std::string& column_value = column_values[row_idx];
if (col_desc.type == TYPE_STRING) {
switch (col_idx) {
case 0: // JOB_ID
column_value = job_info.__isset.job_id ? job_info.job_id : "";
break;
case 1: // LABEL
column_value = job_info.__isset.label ? job_info.label : "";
break;
case 2: // STATE
column_value = job_info.__isset.state ? job_info.state : "";
break;
case 3: // PROGRESS
column_value = job_info.__isset.progress ? job_info.progress : "";
break;
case 4: // TYPE
column_value = job_info.__isset.type ? job_info.type : "";
break;
case 5: // ETL_INFO
column_value = job_info.__isset.etl_info ? job_info.etl_info : "";
break;
case 6: // TASK_INFO
column_value = job_info.__isset.task_info ? job_info.task_info : "";
break;
case 7: // ERROR_MSG
column_value = job_info.__isset.error_msg ? job_info.error_msg : "";
break;
case 8: // CREATE_TIME
column_value = job_info.__isset.create_time ? job_info.create_time : "";
break;
case 9: // ETL_START_TIME
column_value = job_info.__isset.etl_start_time ? job_info.etl_start_time : "";
break;
case 10: // ETL_FINISH_TIME
column_value = job_info.__isset.etl_finish_time ? job_info.etl_finish_time : "";
break;
case 11: // LOAD_START_TIME
column_value = job_info.__isset.load_start_time ? job_info.load_start_time : "";
break;
case 12: // LOAD_FINISH_TIME
column_value =
job_info.__isset.load_finish_time ? job_info.load_finish_time : "";
break;
case 13: // URL
column_value = job_info.__isset.url ? job_info.url : "";
break;
case 14: // JOB_DETAILS
column_value = job_info.__isset.job_details ? job_info.job_details : "";
break;
case 15: // TRANSACTION_ID
column_value = job_info.__isset.transaction_id ? job_info.transaction_id : "";
break;
case 16: // ERROR_TABLETS
column_value = job_info.__isset.error_tablets ? job_info.error_tablets : "";
break;
case 17: // USER
column_value = job_info.__isset.user ? job_info.user : "";
break;
case 18: // COMMENT
column_value = job_info.__isset.comment ? job_info.comment : "";
break;
case 19: // FIRST_ERROR_MSG
column_value = job_info.__isset.first_error_msg ? job_info.first_error_msg : "";
break;
}
str_refs[row_idx] =
StringRef(column_values[row_idx].data(), column_values[row_idx].size());
datas[row_idx] = &str_refs[row_idx];
}
}
RETURN_IF_ERROR(fill_dest_column_for_range(block, col_idx, datas));
}
return Status::OK();
}
} // namespace doris