| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "exec/table_connector.h" |
| |
| // IWYU pragma: no_include <bthread/errno.h> |
| #include <errno.h> // IWYU pragma: keep |
| #include <gen_cpp/Metrics_types.h> |
| #include <gen_cpp/Types_types.h> |
| #include <glog/logging.h> |
| #include <iconv.h> |
| |
| #include <memory> |
| #include <string_view> |
| #include <type_traits> |
| |
| #include "runtime/decimalv2_value.h" |
| #include "runtime/define_primitive_type.h" |
| #include "util/binary_cast.hpp" |
| #include "vec/columns/column.h" |
| #include "vec/columns/column_array.h" |
| #include "vec/columns/column_nullable.h" |
| #include "vec/common/assert_cast.h" |
| #include "vec/core/block.h" |
| #include "vec/data_types/data_type.h" |
| #include "vec/data_types/data_type_array.h" |
| #include "vec/data_types/data_type_date_or_datetime_v2.h" |
| #include "vec/data_types/data_type_nullable.h" |
| #include "vec/runtime/vdatetime_value.h" |
| |
| namespace doris { |
| class TupleDescriptor; |
| |
| TableConnector::TableConnector(const TupleDescriptor* tuple_desc, bool use_transaction, |
| std::string_view table_name, const std::string& sql_str) |
| : _is_open(false), |
| _use_tranaction(use_transaction), |
| _is_in_transaction(false), |
| _table_name(table_name), |
| _tuple_desc(tuple_desc), |
| _sql_str(sql_str) {} |
| |
| void TableConnector::init_profile(doris::RuntimeProfile* profile) { |
| _convert_tuple_timer = ADD_TIMER(profile, "TupleConvertTime"); |
| _result_send_timer = ADD_TIMER(profile, "ResultSendTime"); |
| _sent_rows_counter = ADD_COUNTER(profile, "NumSentRows", TUnit::UNIT); |
| } |
| |
| std::u16string TableConnector::utf8_to_u16string(const char* first, const char* last) { |
| auto deleter = [](auto convertor) { |
| if (convertor == reinterpret_cast<decltype(convertor)>(-1)) { |
| return; |
| } |
| iconv_close(convertor); |
| }; |
| std::unique_ptr<std::remove_pointer_t<iconv_t>, decltype(deleter)> convertor( |
| iconv_open("UTF-16LE", "UTF-8"), deleter); |
| |
| char* in = const_cast<char*>(first); |
| size_t inbytesleft = last - first; |
| |
| char16_t buffer[1024]; |
| char* out = reinterpret_cast<char*>(&buffer[0]); |
| size_t outbytesleft = sizeof(buffer); |
| |
| std::u16string result; |
| while (inbytesleft > 0) { |
| if (iconv(convertor.get(), &in, &inbytesleft, &out, &outbytesleft)) { |
| if (errno == E2BIG) { |
| result += std::u16string_view(buffer, |
| (sizeof(buffer) - outbytesleft) / sizeof(char16_t)); |
| out = reinterpret_cast<char*>(&buffer[0]); |
| outbytesleft = sizeof(buffer); |
| } else { |
| LOG(WARNING) << fmt::format("Failed to convert the UTF-8 string {} to UTF-16LE", |
| std::string(first, last)); |
| return result; |
| } |
| } |
| } |
| result += std::u16string_view(buffer, (sizeof(buffer) - outbytesleft) / sizeof(char16_t)); |
| return result; |
| } |
| |
| Status TableConnector::convert_column_data(const vectorized::ColumnPtr& column_ptr, |
| const vectorized::DataTypePtr& type_ptr, |
| const vectorized::DataTypePtr& type, int row, |
| TOdbcTableType::type table_type) { |
| auto extra_convert_func = [&](const std::string_view& str, const bool& is_date) -> void { |
| if (table_type == TOdbcTableType::ORACLE || table_type == TOdbcTableType::SAP_HANA) { |
| //if is ORACLE and date type, insert into need convert |
| if (is_date) { |
| fmt::format_to(_insert_stmt_buffer, "to_date('{}','yyyy-mm-dd')", str); |
| } else { |
| fmt::format_to(_insert_stmt_buffer, "to_date('{}','yyyy-mm-dd hh24:mi:ss')", str); |
| } |
| } else if (table_type == TOdbcTableType::POSTGRESQL) { |
| fmt::format_to(_insert_stmt_buffer, "'{}'::date", str); |
| } else if (table_type == TOdbcTableType::SQLSERVER) { |
| // Values in sqlserver should be enclosed by single quotes |
| fmt::format_to(_insert_stmt_buffer, "'{}'", str); |
| } else { |
| fmt::format_to(_insert_stmt_buffer, "\"{}\"", str); |
| } |
| }; |
| const vectorized::IColumn* column = column_ptr.get(); |
| if (type_ptr->is_nullable()) { |
| const auto* nullable_column = |
| assert_cast<const vectorized::ColumnNullable*>(column_ptr.get()); |
| if (nullable_column->is_null_at(row)) { |
| fmt::format_to(_insert_stmt_buffer, "{}", "NULL"); |
| return Status::OK(); |
| } |
| column = nullable_column->get_nested_column_ptr().get(); |
| } else { |
| column = column_ptr.get(); |
| } |
| auto [item, size] = column->get_data_at(row); |
| switch (type->get_primitive_type()) { |
| case TYPE_BOOLEAN: |
| if (table_type == TOdbcTableType::SAP_HANA) { |
| fmt::format_to(_insert_stmt_buffer, "{}", *reinterpret_cast<const bool*>(item)); |
| } else { |
| fmt::format_to(_insert_stmt_buffer, "{}", *reinterpret_cast<const int8_t*>(item)); |
| } |
| break; |
| case TYPE_TINYINT: |
| fmt::format_to(_insert_stmt_buffer, "{}", *reinterpret_cast<const int8_t*>(item)); |
| break; |
| case TYPE_SMALLINT: |
| fmt::format_to(_insert_stmt_buffer, "{}", *reinterpret_cast<const int16_t*>(item)); |
| break; |
| case TYPE_INT: |
| fmt::format_to(_insert_stmt_buffer, "{}", *reinterpret_cast<const int32_t*>(item)); |
| break; |
| case TYPE_BIGINT: |
| fmt::format_to(_insert_stmt_buffer, "{}", *reinterpret_cast<const int64_t*>(item)); |
| break; |
| case TYPE_FLOAT: |
| fmt::format_to(_insert_stmt_buffer, "{}", *reinterpret_cast<const float*>(item)); |
| break; |
| case TYPE_DOUBLE: |
| fmt::format_to(_insert_stmt_buffer, "{}", *reinterpret_cast<const double*>(item)); |
| break; |
| case TYPE_DATE: { |
| VecDateTimeValue value = binary_cast<int64_t, doris::VecDateTimeValue>(*(int64_t*)item); |
| |
| char buf[64]; |
| char* pos = value.to_string(buf); |
| std::string_view str(buf, pos - buf - 1); |
| extra_convert_func(str, true); |
| break; |
| } |
| case TYPE_DATETIME: { |
| VecDateTimeValue value = binary_cast<int64_t, doris::VecDateTimeValue>(*(int64_t*)item); |
| |
| char buf[64]; |
| char* pos = value.to_string(buf); |
| std::string_view str(buf, pos - buf - 1); |
| extra_convert_func(str, false); |
| break; |
| } |
| case TYPE_DATEV2: { |
| DateV2Value<DateV2ValueType> value = |
| binary_cast<uint32_t, DateV2Value<DateV2ValueType>>(*(int32_t*)item); |
| |
| char buf[64]; |
| char* pos = value.to_string(buf); |
| std::string str(buf, pos - buf - 1); |
| extra_convert_func(str, true); |
| break; |
| } |
| case TYPE_DATETIMEV2: { |
| DateV2Value<DateTimeV2ValueType> value = |
| binary_cast<uint64_t, DateV2Value<DateTimeV2ValueType>>(*(int64_t*)item); |
| |
| char buf[64]; |
| char* pos = value.to_string(buf, type->get_scale()); |
| std::string str(buf, pos - buf - 1); |
| extra_convert_func(str, false); |
| break; |
| } |
| case TYPE_VARCHAR: |
| case TYPE_CHAR: |
| case TYPE_STRING: { |
| // for oracle/pg database string must be ' |
| if (table_type == TOdbcTableType::ORACLE || table_type == TOdbcTableType::POSTGRESQL || |
| table_type == TOdbcTableType::SAP_HANA || table_type == TOdbcTableType::MYSQL || |
| table_type == TOdbcTableType::CLICKHOUSE || table_type == TOdbcTableType::SQLSERVER) { |
| fmt::format_to(_insert_stmt_buffer, "'{}'", fmt::basic_string_view(item, size)); |
| } else { |
| fmt::format_to(_insert_stmt_buffer, "\"{}\"", fmt::basic_string_view(item, size)); |
| } |
| break; |
| } |
| case TYPE_ARRAY: { |
| auto& arr_nested = reinterpret_cast<const vectorized::ColumnArray*>(column)->get_data_ptr(); |
| auto& arr_offset = reinterpret_cast<const vectorized::ColumnArray*>(column)->get_offsets(); |
| auto array_type = remove_nullable(type_ptr); |
| auto nested_type = |
| reinterpret_cast<const vectorized::DataTypeArray&>(*array_type).get_nested_type(); |
| |
| //for doris、CK insert into ---> [] |
| //for PG insert into ---> ARRAY[] |
| if (table_type == TOdbcTableType::POSTGRESQL) { |
| fmt::format_to(_insert_stmt_buffer, "{}", "ARRAY["); |
| } else if (table_type == TOdbcTableType::CLICKHOUSE || |
| table_type == TOdbcTableType::MYSQL) { |
| fmt::format_to(_insert_stmt_buffer, "{}", "["); |
| } |
| bool first_value = true; |
| for (auto idx = arr_offset[row - 1]; idx < arr_offset[row]; ++idx) { |
| if (first_value == false) { |
| fmt::format_to(_insert_stmt_buffer, "{}", ", "); |
| } |
| if (arr_nested->is_null_at(idx)) { |
| fmt::format_to(_insert_stmt_buffer, "{}", "NULL"); |
| } else { |
| RETURN_IF_ERROR(convert_column_data(arr_nested, nested_type, |
| assert_cast<const vectorized::DataTypeArray*>( |
| vectorized::remove_nullable(type).get()) |
| ->get_nested_type(), |
| idx, table_type)); |
| } |
| first_value = false; |
| } |
| fmt::format_to(_insert_stmt_buffer, "{}", "]"); |
| break; |
| } |
| case TYPE_DECIMALV2: { |
| DecimalV2Value value = *(DecimalV2Value*)(item); |
| fmt::format_to(_insert_stmt_buffer, "{}", value.to_string()); |
| break; |
| } |
| case TYPE_DECIMAL32: |
| case TYPE_DECIMAL64: |
| case TYPE_DECIMAL128I: |
| case TYPE_DECIMAL256: { |
| auto decimal_type = remove_nullable(type_ptr); |
| auto val = decimal_type->to_string(*column, row); |
| fmt::format_to(_insert_stmt_buffer, "{}", val); |
| break; |
| } |
| case TYPE_LARGEINT: { |
| fmt::format_to(_insert_stmt_buffer, "{}", *reinterpret_cast<const __int128*>(item)); |
| break; |
| } |
| default: { |
| return Status::InternalError("can't convert this type to mysql type. type = {}", |
| type->get_name()); |
| } |
| } |
| return Status::OK(); |
| } |
| } // namespace doris |