blob: c0720c5580bc7212091e30ac05a86c1c7fb2f39b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/exec/vmysql_scan_node.h"
#include <gen_cpp/PlanNodes_types.h>
#include "runtime/runtime_state.h"
#include "util/runtime_profile.h"
#include "util/types.h"
#include "vec/common/string_ref.h"
namespace doris::vectorized {
VMysqlScanNode::VMysqlScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
: ScanNode(pool, tnode, descs),
_is_init(false),
_table_name(tnode.mysql_scan_node.table_name),
_tuple_id(tnode.mysql_scan_node.tuple_id),
_columns(tnode.mysql_scan_node.columns),
_filters(tnode.mysql_scan_node.filters),
_tuple_desc(nullptr),
_slot_num(0) {}
Status VMysqlScanNode::prepare(RuntimeState* state) {
VLOG_CRITICAL << "VMysqlScanNode::Prepare";
if (_is_init) {
return Status::OK();
}
if (nullptr == state) {
return Status::InternalError("input pointer is nullptr.");
}
RETURN_IF_ERROR(ScanNode::prepare(state));
// get tuple desc
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
if (nullptr == _tuple_desc) {
return Status::InternalError("Failed to get tuple descriptor.");
}
_slot_num = _tuple_desc->slots().size();
// get mysql info
const MySQLTableDescriptor* mysql_table =
static_cast<const MySQLTableDescriptor*>(_tuple_desc->table_desc());
if (nullptr == mysql_table) {
return Status::InternalError("mysql table pointer is nullptr.");
}
_my_param.host = mysql_table->host();
_my_param.port = mysql_table->port();
_my_param.user = mysql_table->user();
_my_param.passwd = mysql_table->passwd();
_my_param.db = mysql_table->mysql_db();
_my_param.charset = mysql_table->charset();
// new one scanner
_mysql_scanner.reset(new (std::nothrow) MysqlScanner(_my_param));
if (_mysql_scanner == nullptr) {
return Status::InternalError("new a mysql scanner failed.");
}
_text_serdes = create_data_type_serdes(_tuple_desc->slots());
for (int i = 0; i < _slot_num; ++i) {
auto& slot_desc = _tuple_desc->slots()[i];
if (slot_desc->is_materialized() && _text_serdes[i].get() == nullptr) {
return Status::InternalError("new a {} serde failed.", slot_desc->type().type);
}
}
_is_init = true;
return Status::OK();
}
Status VMysqlScanNode::open(RuntimeState* state) {
if (nullptr == state) {
return Status::InternalError("input pointer is nullptr.");
}
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::open(state));
VLOG_CRITICAL << "MysqlScanNode::Open";
if (!_is_init) {
return Status::InternalError("used before initialize.");
}
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(_mysql_scanner->open());
RETURN_IF_ERROR(_mysql_scanner->query(_table_name, _columns, _filters, _limit));
// check materialize slot num
int materialize_num = 0;
for (int i = 0; i < _tuple_desc->slots().size(); ++i) {
if (_tuple_desc->slots()[i]->is_materialized()) {
materialize_num++;
}
}
if (_mysql_scanner->field_num() != materialize_num) {
return Status::InternalError("input and output not equal.");
}
return Status::OK();
}
Status VMysqlScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) {
if (state == nullptr || block == nullptr || eos == nullptr) {
return Status::InternalError("input is nullptr");
}
VLOG_CRITICAL << "VMysqlScanNode::GetNext";
if (!_is_init) {
return Status::InternalError("used before initialize.");
}
RETURN_IF_CANCELLED(state);
bool mem_reuse = block->mem_reuse();
DCHECK(block->rows() == 0);
std::vector<vectorized::MutableColumnPtr> columns(_slot_num);
bool mysql_eos = false;
do {
columns.resize(_slot_num);
for (int i = 0; i < _slot_num; ++i) {
if (mem_reuse) {
columns[i] = std::move(*block->get_by_position(i).column).mutate();
} else {
columns[i] = _tuple_desc->slots()[i]->get_empty_mutable_column();
}
}
while (true) {
RETURN_IF_CANCELLED(state);
int batch_size = state->batch_size();
if (columns[0]->size() == batch_size) {
break;
}
char** data = nullptr;
unsigned long* length = nullptr;
RETURN_IF_ERROR(_mysql_scanner->get_next_row(&data, &length, &mysql_eos));
if (mysql_eos) {
*eos = true;
break;
}
int j = 0;
for (int i = 0; i < _slot_num; ++i) {
auto slot_desc = _tuple_desc->slots()[i];
if (!slot_desc->is_materialized()) {
continue;
}
if (data[j] == nullptr) {
if (slot_desc->is_nullable()) {
auto* nullable_column =
reinterpret_cast<vectorized::ColumnNullable*>(columns[i].get());
nullable_column->insert_data(nullptr, 0);
} else {
return Status::InternalError(
"nonnull column contains NULL. table={}, column={}", _table_name,
slot_desc->col_name());
}
} else {
Slice slice(data[j], length[j]);
if (_text_serdes[i]->deserialize_one_cell_from_hive_text(
*columns[i].get(), slice, _text_formatOptions) != Status::OK()) {
std::stringstream ss;
ss << "Fail to convert mysql value:'" << data[j] << "' to "
<< slot_desc->type() << " on column:`" << slot_desc->col_name() + "`";
return Status::InternalError(ss.str());
}
}
j++;
}
}
auto n_columns = 0;
if (!mem_reuse) {
for (const auto slot_desc : _tuple_desc->slots()) {
block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]),
slot_desc->get_data_type_ptr(),
slot_desc->col_name()));
}
} else {
columns.clear();
}
VLOG_ROW << "VMYSQLScanNode output rows: " << block->rows();
} while (block->rows() == 0 && !(*eos));
reached_limit(block, eos);
return Status::OK();
}
Status VMysqlScanNode::close(RuntimeState* state) {
if (is_closed()) {
return Status::OK();
}
SCOPED_TIMER(_runtime_profile->total_time_counter());
return ExecNode::close(state);
}
void VMysqlScanNode::debug_string(int indentation_level, std::stringstream* out) const {
*out << string(indentation_level * 2, ' ');
*out << "VMysqlScanNode(tupleid=" << _tuple_id << " table=" << _table_name;
*out << ")" << std::endl;
for (int i = 0; i < _children.size(); ++i) {
_children[i]->debug_string(indentation_level + 1, out);
}
}
Status VMysqlScanNode::set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) {
return Status::OK();
}
} // namespace doris::vectorized