blob: 7f97a7c0745a55d0a344286dc827050416f5d87b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vjdbc_table_writer.h"
#include <gen_cpp/DataSinks_types.h>
#include <stdint.h>
#include <sstream>
#include "common/logging.h"
#include "core/block/block.h"
#include "exprs/vexpr.h"
#include "exprs/vexpr_context.h"
#include "runtime/runtime_state.h"
#include "util/jdbc_utils.h"
namespace doris {
std::map<std::string, std::string> VJdbcTableWriter::_build_writer_params(const TDataSink& t_sink) {
const TJdbcTableSink& t_jdbc_sink = t_sink.jdbc_table_sink;
std::map<std::string, std::string> params;
params["jdbc_url"] = t_jdbc_sink.jdbc_table.jdbc_url;
params["jdbc_user"] = t_jdbc_sink.jdbc_table.jdbc_user;
params["jdbc_password"] = t_jdbc_sink.jdbc_table.jdbc_password;
params["jdbc_driver_class"] = t_jdbc_sink.jdbc_table.jdbc_driver_class;
// Resolve jdbc_driver_url to absolute file:// URL
std::string driver_url;
auto resolve_st =
JdbcUtils::resolve_driver_url(t_jdbc_sink.jdbc_table.jdbc_driver_url, &driver_url);
if (!resolve_st.ok()) {
LOG(WARNING) << "Failed to resolve JDBC driver URL: " << resolve_st.to_string();
driver_url = t_jdbc_sink.jdbc_table.jdbc_driver_url;
}
params["jdbc_driver_url"] = driver_url;
params["jdbc_driver_checksum"] = t_jdbc_sink.jdbc_table.jdbc_driver_checksum;
params["insert_sql"] = t_jdbc_sink.insert_sql;
params["use_transaction"] = t_jdbc_sink.use_transaction ? "true" : "false";
params["catalog_id"] = std::to_string(t_jdbc_sink.jdbc_table.catalog_id);
params["connection_pool_min_size"] =
std::to_string(t_jdbc_sink.jdbc_table.connection_pool_min_size);
params["connection_pool_max_size"] =
std::to_string(t_jdbc_sink.jdbc_table.connection_pool_max_size);
params["connection_pool_max_wait_time"] =
std::to_string(t_jdbc_sink.jdbc_table.connection_pool_max_wait_time);
params["connection_pool_max_life_time"] =
std::to_string(t_jdbc_sink.jdbc_table.connection_pool_max_life_time);
params["connection_pool_keep_alive"] =
t_jdbc_sink.jdbc_table.connection_pool_keep_alive ? "true" : "false";
return params;
}
VJdbcTableWriter::VJdbcTableWriter(const TDataSink& t_sink,
const VExprContextSPtrs& output_expr_ctxs,
std::shared_ptr<Dependency> dep,
std::shared_ptr<Dependency> fin_dep)
: AsyncResultWriter(output_expr_ctxs, dep, fin_dep),
_writer_params(_build_writer_params(t_sink)),
_use_transaction(t_sink.jdbc_table_sink.use_transaction) {}
Status VJdbcTableWriter::open(RuntimeState* state, RuntimeProfile* operator_profile) {
_writer = std::make_unique<VJniFormatTransformer>(
state, _vec_output_expr_ctxs, "org/apache/doris/jdbc/JdbcJniWriter", _writer_params);
return _writer->open();
}
Status VJdbcTableWriter::write(RuntimeState* state, Block& block) {
Block output_block;
RETURN_IF_ERROR(_projection_block(block, &output_block));
if (output_block.rows() == 0) {
return Status::OK();
}
return _writer->write(output_block);
}
Status VJdbcTableWriter::finish(RuntimeState* state) {
if (!_use_transaction || !_writer) {
return Status::OK();
}
// Transaction commit is handled in JdbcJniWriter.close() on the Java side.
// When useTransaction=true, close() calls conn.commit() before closing the connection.
return Status::OK();
}
Status VJdbcTableWriter::close(Status s) {
if (_writer) {
return _writer->close();
}
return Status::OK();
}
} // namespace doris