| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "vec/sink/vmysql_result_writer.h" |
| |
| #include <fmt/core.h> |
| #include <gen_cpp/Data_types.h> |
| #include <gen_cpp/Metrics_types.h> |
| #include <gen_cpp/PaloInternalService_types.h> |
| #include <gen_cpp/internal_service.pb.h> |
| #include <glog/logging.h> |
| #include <stdint.h> |
| #include <string.h> |
| #include <sys/types.h> |
| |
| #include <ostream> |
| #include <string> |
| |
| #include "common/cast_set.h" |
| #include "common/compiler_util.h" // IWYU pragma: keep |
| #include "common/config.h" |
| #include "runtime/define_primitive_type.h" |
| #include "runtime/result_block_buffer.h" |
| #include "runtime/runtime_state.h" |
| #include "runtime/types.h" |
| #include "udf/udf.h" |
| #include "util/mysql_global.h" |
| #include "vec/aggregate_functions/aggregate_function.h" |
| #include "vec/columns/column.h" |
| #include "vec/columns/column_const.h" |
| #include "vec/core/block.h" |
| #include "vec/core/column_with_type_and_name.h" |
| #include "vec/core/types.h" |
| #include "vec/data_types/data_type_array.h" |
| #include "vec/data_types/data_type_decimal.h" |
| #include "vec/data_types/data_type_map.h" |
| #include "vec/data_types/data_type_nullable.h" |
| #include "vec/data_types/data_type_struct.h" |
| #include "vec/data_types/serde/data_type_serde.h" |
| #include "vec/exprs/vexpr.h" |
| #include "vec/exprs/vexpr_context.h" |
| |
| namespace doris::vectorized { |
| #include "common/compile_check_begin.h" |
| |
| void GetResultBatchCtx::on_failure(const Status& status) { |
| DCHECK(!status.ok()) << "status is ok, errmsg=" << status; |
| status.to_protobuf(_result->mutable_status()); |
| _done->Run(); |
| } |
| |
| void GetResultBatchCtx::on_close(int64_t packet_seq, int64_t returned_rows) { |
| Status status; |
| status.to_protobuf(_result->mutable_status()); |
| PQueryStatistics* statistics = _result->mutable_query_statistics(); |
| statistics->set_returned_rows(returned_rows); |
| _result->set_packet_seq(packet_seq); |
| _result->set_eos(true); |
| _done->Run(); |
| } |
| |
| Status GetResultBatchCtx::on_data(const std::shared_ptr<TFetchDataResult>& t_result, |
| int64_t packet_seq, ResultBlockBufferBase* buffer) { |
| Status st = Status::OK(); |
| if (t_result != nullptr) { |
| uint8_t* buf = nullptr; |
| uint32_t len = 0; |
| ThriftSerializer ser(false, 4096); |
| RETURN_IF_ERROR(ser.serialize(&t_result->result_batch, &len, &buf)); |
| _result->set_row_batch(std::string((const char*)buf, len)); |
| } else { |
| _result->clear_row_batch(); |
| _result->set_empty_batch(true); |
| } |
| _result->set_packet_seq(packet_seq); |
| _result->set_eos(false); |
| |
| /// The size limit of proto buffer message is 2G |
| if (_result->ByteSizeLong() > _max_msg_size) { |
| st = Status::InternalError("Message size exceeds 2GB: {}", _result->ByteSizeLong()); |
| _result->clear_row_batch(); |
| _result->set_empty_batch(true); |
| } |
| st.to_protobuf(_result->mutable_status()); |
| _done->Run(); |
| return Status::OK(); |
| } |
| |
| VMysqlResultWriter::VMysqlResultWriter(std::shared_ptr<ResultBlockBufferBase> sinker, |
| const VExprContextSPtrs& output_vexpr_ctxs, |
| RuntimeProfile* parent_profile, bool is_binary_format) |
| : ResultWriter(), |
| _sinker(std::dynamic_pointer_cast<MySQLResultBlockBuffer>(sinker)), |
| _output_vexpr_ctxs(output_vexpr_ctxs), |
| _parent_profile(parent_profile), |
| _is_binary_format(is_binary_format) {} |
| |
| Status VMysqlResultWriter::init(RuntimeState* state) { |
| _init_profile(); |
| set_output_object_data(state->return_object_data_as_binary()); |
| _is_dry_run = state->query_options().dry_run_query; |
| return Status::OK(); |
| } |
| |
| void VMysqlResultWriter::_init_profile() { |
| if (_parent_profile != nullptr) { |
| // for PointQueryExecutor, _parent_profile is null |
| _append_row_batch_timer = ADD_TIMER(_parent_profile, "AppendBatchTime"); |
| _convert_tuple_timer = |
| ADD_CHILD_TIMER(_parent_profile, "TupleConvertTime", "AppendBatchTime"); |
| _result_send_timer = ADD_CHILD_TIMER(_parent_profile, "ResultSendTime", "AppendBatchTime"); |
| _sent_rows_counter = ADD_COUNTER(_parent_profile, "NumSentRows", TUnit::UNIT); |
| _bytes_sent_counter = ADD_COUNTER(_parent_profile, "BytesSent", TUnit::BYTES); |
| } |
| } |
| |
| Status VMysqlResultWriter::_write_one_block(RuntimeState* state, Block& block) { |
| Status status = Status::OK(); |
| int num_rows = cast_set<int>(block.rows()); |
| // convert one batch |
| auto result = std::make_shared<TFetchDataResult>(); |
| result->result_batch.rows.resize(num_rows); |
| uint64_t bytes_sent = 0; |
| { |
| SCOPED_TIMER(_convert_tuple_timer); |
| |
| struct Arguments { |
| const IColumn* column; |
| bool is_const; |
| DataTypeSerDeSPtr serde; |
| PrimitiveType type; |
| }; |
| auto options = DataTypeSerDe::get_default_format_options(); |
| options.timezone = &state->timezone_obj(); |
| |
| const size_t num_cols = _output_vexpr_ctxs.size(); |
| std::vector<Arguments> arguments; |
| arguments.reserve(num_cols); |
| |
| for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) { |
| const auto& [column_ptr, col_const] = |
| unpack_if_const(block.get_by_position(col_idx).column); |
| int scale = _output_vexpr_ctxs[col_idx]->root()->data_type()->get_scale(); |
| // decimalv2 scale and precision is hard code, so we should get real scale and precision |
| // from expr |
| DataTypeSerDeSPtr serde; |
| if (_output_vexpr_ctxs[col_idx]->root()->data_type()->get_primitive_type() == |
| PrimitiveType::TYPE_DECIMALV2) { |
| if (_output_vexpr_ctxs[col_idx]->root()->is_nullable()) { |
| auto nested_serde = |
| std::make_shared<DataTypeDecimalSerDe<TYPE_DECIMALV2>>(27, scale); |
| serde = std::make_shared<DataTypeNullableSerDe>(nested_serde); |
| } else { |
| serde = std::make_shared<DataTypeDecimalSerDe<TYPE_DECIMALV2>>(27, scale); |
| } |
| } else { |
| serde = block.get_by_position(col_idx).type->get_serde(); |
| } |
| serde->set_return_object_as_string(output_object_data()); |
| arguments.emplace_back(column_ptr.get(), col_const, serde, |
| block.get_by_position(col_idx).type->get_primitive_type()); |
| } |
| |
| for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) { |
| const auto& argument = arguments[col_idx]; |
| // const column will only have 1 row, see unpack_if_const |
| if (argument.column->size() < num_rows && !argument.is_const) { |
| return Status::InternalError( |
| "Required row size is out of range, need {} rows, column {} has {} " |
| "rows in fact.", |
| num_rows, argument.column->get_name(), argument.column->size()); |
| } |
| } |
| auto mysql_output_tmp_col = ColumnString::create(); |
| BufferWriter write_buffer(*mysql_output_tmp_col); |
| size_t write_buffer_index = 0; |
| // For non-binary format, we need to call different serialization interfaces |
| // write_column_to_mysql/presto/hive text |
| if (!_is_binary_format) { |
| const auto& serde_dialect = state->query_options().serde_dialect; |
| auto write_to_text = [serde_dialect](DataTypeSerDeSPtr& serde, const IColumn* column, |
| BufferWriter& write_buffer, size_t col_index, |
| const DataTypeSerDe::FormatOptions& options) { |
| if (serde_dialect == TSerdeDialect::DORIS) { |
| return serde->write_column_to_mysql_text(*column, write_buffer, col_index, |
| options); |
| } else if (serde_dialect == TSerdeDialect::PRESTO) { |
| return serde->write_column_to_presto_text(*column, write_buffer, col_index, |
| options); |
| } else if (serde_dialect == TSerdeDialect::HIVE) { |
| return serde->write_column_to_hive_text(*column, write_buffer, col_index, |
| options); |
| } else { |
| return false; |
| } |
| }; |
| |
| for (int row_idx = 0; row_idx < num_rows; ++row_idx) { |
| auto& mysql_rows = result->result_batch.rows[row_idx]; |
| for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) { |
| const auto col_index = index_check_const(row_idx, arguments[col_idx].is_const); |
| const auto* column = arguments[col_idx].column; |
| if (write_to_text(arguments[col_idx].serde, column, write_buffer, col_index, |
| options)) { |
| write_buffer.commit(); |
| auto str = mysql_output_tmp_col->get_data_at(write_buffer_index); |
| direct_write_to_mysql_result_string(mysql_rows, str.data, str.size); |
| write_buffer_index++; |
| } else { |
| direct_write_to_mysql_result_null(mysql_rows); |
| } |
| } |
| bytes_sent += mysql_rows.size(); |
| } |
| } else { |
| MysqlRowBinaryBuffer row_buffer; |
| |
| row_buffer.start_binary_row(_output_vexpr_ctxs.size()); |
| |
| for (int row_idx = 0; row_idx < num_rows; ++row_idx) { |
| for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) { |
| auto type = arguments[col_idx].type; |
| if (type == PrimitiveType::TYPE_ARRAY || type == PrimitiveType::TYPE_MAP || |
| type == PrimitiveType::TYPE_STRUCT || |
| type == PrimitiveType::TYPE_QUANTILE_STATE || |
| type == PrimitiveType::TYPE_HLL || type == PrimitiveType::TYPE_BITMAP) { |
| // Complex types are not supported in binary format yet |
| // So use text format serialization interface here |
| const auto col_index = |
| index_check_const(row_idx, arguments[col_idx].is_const); |
| const auto* column = arguments[col_idx].column; |
| if (arguments[col_idx].serde->write_column_to_mysql_text( |
| *column, write_buffer, col_index, options)) { |
| write_buffer.commit(); |
| auto str = mysql_output_tmp_col->get_data_at(write_buffer_index); |
| row_buffer.push_string(str.data, str.size); |
| write_buffer_index++; |
| } else { |
| row_buffer.push_null(); |
| } |
| |
| } else { |
| RETURN_IF_ERROR(arguments[col_idx].serde->write_column_to_mysql_binary( |
| *(arguments[col_idx].column), row_buffer, row_idx, |
| arguments[col_idx].is_const, options)); |
| } |
| } |
| |
| result->result_batch.rows[row_idx].append(row_buffer.buf(), row_buffer.length()); |
| bytes_sent += row_buffer.length(); |
| row_buffer.reset(); |
| row_buffer.start_binary_row(_output_vexpr_ctxs.size()); |
| } |
| } |
| } |
| { |
| SCOPED_TIMER(_result_send_timer); |
| // If this is a dry run task, no need to send data block |
| if (!_is_dry_run) { |
| status = _sinker->add_batch(state, result); |
| } |
| if (status.ok()) { |
| _written_rows += num_rows; |
| if (!_is_dry_run) { |
| _bytes_sent += bytes_sent; |
| } |
| } else { |
| LOG(WARNING) << "append result batch to sink failed."; |
| } |
| } |
| return status; |
| } |
| |
| Status VMysqlResultWriter::write(RuntimeState* state, Block& input_block) { |
| SCOPED_TIMER(_append_row_batch_timer); |
| Status status = Status::OK(); |
| if (UNLIKELY(input_block.rows() == 0)) { |
| return status; |
| } |
| |
| DCHECK(_output_vexpr_ctxs.empty() != true); |
| |
| // Exec vectorized expr here to speed up, block.rows() == 0 means expr exec |
| // failed, just return the error status |
| Block block; |
| RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs, |
| input_block, &block)); |
| const auto total_bytes = block.bytes(); |
| |
| if (total_bytes > config::thrift_max_message_size) [[unlikely]] { |
| const auto total_rows = block.rows(); |
| const auto sub_block_count = (total_bytes + config::thrift_max_message_size - 1) / |
| config::thrift_max_message_size; |
| const auto sub_block_rows = (total_rows + sub_block_count - 1) / sub_block_count; |
| |
| size_t offset = 0; |
| while (offset < total_rows) { |
| size_t rows = std::min(static_cast<size_t>(sub_block_rows), total_rows - offset); |
| auto sub_block = block.clone_empty(); |
| for (size_t i = 0; i != block.columns(); ++i) { |
| sub_block.get_by_position(i).column = |
| block.get_by_position(i).column->cut(offset, rows); |
| } |
| offset += rows; |
| |
| RETURN_IF_ERROR(_write_one_block(state, sub_block)); |
| } |
| return Status::OK(); |
| } |
| |
| return _write_one_block(state, block); |
| } |
| |
| Status VMysqlResultWriter::close(Status) { |
| COUNTER_SET(_sent_rows_counter, _written_rows); |
| COUNTER_UPDATE(_bytes_sent_counter, _bytes_sent); |
| return Status::OK(); |
| } |
| |
| } // namespace doris::vectorized |