blob: 89071563c653cad152006b176fefdd60cffa3c8e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "jdbc_jni_reader.h"
#include <sstream>
#include "core/block/block.h"
#include "core/column/column_nullable.h"
#include "core/data_type/data_type_nullable.h"
#include "core/data_type/data_type_string.h"
#include "core/types.h"
#include "exprs/function/simple_function_factory.h"
#include "format/jni/jni_data_bridge.h"
#include "runtime/descriptors.h"
#include "util/jdbc_utils.h"
namespace doris {
#include "common/compile_check_begin.h"
JdbcJniReader::JdbcJniReader(const std::vector<SlotDescriptor*>& file_slot_descs,
RuntimeState* state, RuntimeProfile* profile,
const std::map<std::string, std::string>& jdbc_params)
: JniReader(
file_slot_descs, state, profile, "org/apache/doris/jdbc/JdbcJniScanner",
[&]() {
std::ostringstream required_fields;
std::ostringstream columns_types;
std::ostringstream replace_string;
int index = 0;
for (const auto& desc : file_slot_descs) {
std::string field = desc->col_name();
std::string type =
JniDataBridge::get_jni_type_with_different_string(desc->type());
// Determine replace_string for special types
// (bitmap, hll, quantile_state, jsonb)
std::string replace_type = "not_replace";
auto ptype = desc->type()->get_primitive_type();
if (ptype == PrimitiveType::TYPE_BITMAP) {
replace_type = "bitmap";
} else if (ptype == PrimitiveType::TYPE_HLL) {
replace_type = "hll";
} else if (ptype == PrimitiveType::TYPE_JSONB) {
replace_type = "jsonb";
} else if (ptype == PrimitiveType::TYPE_QUANTILE_STATE) {
replace_type = "quantile_state";
}
if (index == 0) {
required_fields << field;
columns_types << type;
replace_string << replace_type;
} else {
required_fields << "," << field;
columns_types << "#" << type;
replace_string << "," << replace_type;
}
index++;
}
// Merge JDBC-specific params with schema params
std::map<std::string, std::string> params = jdbc_params;
params["required_fields"] = required_fields.str();
params["columns_types"] = columns_types.str();
params["replace_string"] = replace_string.str();
// Resolve jdbc_driver_url to absolute file:// URL
if (params.count("jdbc_driver_url")) {
std::string resolved;
if (JdbcUtils::resolve_driver_url(params["jdbc_driver_url"], &resolved)
.ok()) {
params["jdbc_driver_url"] = resolved;
}
}
return params;
}(),
[&]() {
std::vector<std::string> names;
for (const auto& desc : file_slot_descs) {
names.emplace_back(desc->col_name());
}
return names;
}()),
_jdbc_params(jdbc_params) {}
Status JdbcJniReader::init_reader() {
return open(_state, _profile);
}
bool JdbcJniReader::_is_special_type(PrimitiveType type) {
return type == PrimitiveType::TYPE_BITMAP || type == PrimitiveType::TYPE_HLL ||
type == PrimitiveType::TYPE_QUANTILE_STATE || type == PrimitiveType::TYPE_JSONB;
}
Status JdbcJniReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
// Identify columns with special types (bitmap, HLL, quantile_state, JSONB)
// and temporarily replace them with string columns for JNI data transfer.
// This follows the same pattern as the old vjdbc_connector.cpp _get_reader_params.
struct SpecialColumnInfo {
int block_idx;
DataTypePtr original_type;
ColumnPtr original_column;
};
std::vector<SpecialColumnInfo> special_columns;
auto name_to_pos_map = block->get_name_to_pos_map();
const auto& slots = _file_slot_descs;
for (size_t i = 0; i < slots.size(); ++i) {
auto* slot = slots[i];
auto ptype = slot->type()->get_primitive_type();
if (_is_special_type(ptype)) {
// Find the block index for this column
int block_idx = name_to_pos_map[slot->col_name()];
auto& col_with_type = block->get_by_position(block_idx);
SpecialColumnInfo info;
info.block_idx = block_idx;
info.original_type = col_with_type.type;
info.original_column = col_with_type.column;
special_columns.push_back(info);
// Replace block column with string type
DataTypePtr string_type = std::make_shared<DataTypeString>();
if (slot->is_nullable()) {
string_type = make_nullable(string_type);
}
block->get_by_position(block_idx).column =
string_type->create_column()->convert_to_full_column_if_const();
block->get_by_position(block_idx).type = string_type;
}
}
// Call parent to do the actual JNI read with string columns
RETURN_IF_ERROR(JniReader::get_next_block(block, read_rows, eof));
// Cast string columns back to their target types
if (*read_rows > 0 && !special_columns.empty()) {
for (size_t i = 0; i < slots.size(); ++i) {
auto* slot = slots[i];
auto ptype = slot->type()->get_primitive_type();
if (_is_special_type(ptype)) {
int block_idx = name_to_pos_map[slot->col_name()];
RETURN_IF_ERROR(_cast_string_to_special_type(slot, block, block_idx,
static_cast<int>(*read_rows)));
}
}
} else if (special_columns.empty()) {
// No special columns, nothing to do
} else {
// No rows read but we replaced columns, restore original types for next call
for (auto& info : special_columns) {
block->get_by_position(info.block_idx).type = info.original_type;
block->get_by_position(info.block_idx).column = info.original_column;
}
}
return Status::OK();
}
Status JdbcJniReader::_cast_string_to_special_type(const SlotDescriptor* slot_desc, Block* block,
int column_index, int num_rows) {
DataTypePtr target_data_type = slot_desc->get_data_type_ptr();
std::string target_data_type_name = target_data_type->get_name();
// Build input string type (nullable if slot is nullable)
DataTypePtr input_string_type;
if (slot_desc->is_nullable()) {
input_string_type = make_nullable(std::make_shared<DataTypeString>());
} else {
input_string_type = std::make_shared<DataTypeString>();
}
auto& input_col = block->get_by_position(column_index).column;
// Build CAST function arguments
DataTypePtr cast_param_data_type = target_data_type;
ColumnPtr cast_param = cast_param_data_type->create_column_const_with_default_value(1);
ColumnsWithTypeAndName argument_template;
argument_template.reserve(2);
argument_template.emplace_back(std::move(input_col), input_string_type, "java.sql.String");
argument_template.emplace_back(cast_param, cast_param_data_type, target_data_type_name);
FunctionBasePtr func_cast = SimpleFunctionFactory::instance().get_function(
"CAST", argument_template, make_nullable(target_data_type));
if (func_cast == nullptr) {
return Status::InternalError("Failed to find CAST function for type {}",
target_data_type_name);
}
Block cast_block(argument_template);
int result_idx = cast_block.columns();
cast_block.insert({nullptr, make_nullable(target_data_type), "cast_result"});
RETURN_IF_ERROR(func_cast->execute(nullptr, cast_block, {0}, result_idx, num_rows));
auto res_col = cast_block.get_by_position(result_idx).column;
block->get_by_position(column_index).type = target_data_type;
if (target_data_type->is_nullable()) {
block->replace_by_position(column_index, res_col);
} else {
auto nested_ptr =
reinterpret_cast<const ColumnNullable*>(res_col.get())->get_nested_column_ptr();
block->replace_by_position(column_index, nested_ptr);
}
return Status::OK();
}
#include "common/compile_check_end.h"
} // namespace doris