blob: c4111c7c2d3117f44a6a5b00f3b84019fbeac3f6 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/sink/writer/vtvf_table_writer.h"
#include <fmt/format.h>
#include "common/status.h"
#include "core/block/block.h"
#include "exprs/vexpr.h"
#include "exprs/vexpr_context.h"
#include "io/file_factory.h"
#include "runtime/runtime_state.h"
namespace doris {
VTVFTableWriter::VTVFTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs,
std::shared_ptr<Dependency> dep,
std::shared_ptr<Dependency> fin_dep)
: AsyncResultWriter(output_exprs, dep, fin_dep) {
_tvf_sink = t_sink.tvf_table_sink;
}
Status VTVFTableWriter::open(RuntimeState* state, RuntimeProfile* profile) {
_state = state;
// Init profile counters
RuntimeProfile* writer_profile = profile->create_child("VTVFTableWriter", true, true);
_written_rows_counter = ADD_COUNTER(writer_profile, "NumWrittenRows", TUnit::UNIT);
_written_data_bytes = ADD_COUNTER(writer_profile, "WrittenDataBytes", TUnit::BYTES);
_file_write_timer = ADD_TIMER(writer_profile, "FileWriteTime");
_writer_close_timer = ADD_TIMER(writer_profile, "FileWriterCloseTime");
_file_path = _tvf_sink.file_path;
_max_file_size_bytes =
_tvf_sink.__isset.max_file_size_bytes ? _tvf_sink.max_file_size_bytes : 0;
VLOG_DEBUG << "TVF table writer open, query_id=" << print_id(_state->query_id())
<< ", tvf_name=" << _tvf_sink.tvf_name << ", file_path=" << _tvf_sink.file_path
<< ", file_format=" << _tvf_sink.file_format << ", file_type=" << _tvf_sink.file_type
<< ", max_file_size_bytes=" << _max_file_size_bytes
<< ", columns_count=" << (_tvf_sink.__isset.columns ? _tvf_sink.columns.size() : 0);
return _create_next_file_writer();
}
Status VTVFTableWriter::write(RuntimeState* state, Block& block) {
COUNTER_UPDATE(_written_rows_counter, block.rows());
state->update_num_rows_load_total(block.rows());
{
SCOPED_TIMER(_file_write_timer);
RETURN_IF_ERROR(_vfile_writer->write(block));
}
_current_written_bytes = _vfile_writer->written_len();
// Auto-split if max file size is set
if (_max_file_size_bytes > 0) {
RETURN_IF_ERROR(_create_new_file_if_exceed_size());
}
return Status::OK();
}
Status VTVFTableWriter::close(Status status) {
if (!status.ok()) {
return status;
}
SCOPED_TIMER(_writer_close_timer);
return _close_file_writer(true);
}
Status VTVFTableWriter::_create_file_writer(const std::string& file_name) {
bool use_jni = _tvf_sink.__isset.writer_type && _tvf_sink.writer_type == TTVFWriterType::JNI;
if (!use_jni) {
// Native path: create file writer via FileFactory
TFileType::type file_type = _tvf_sink.file_type;
std::map<std::string, std::string> properties;
if (_tvf_sink.__isset.properties) {
properties = _tvf_sink.properties;
}
_file_writer_impl = DORIS_TRY(FileFactory::create_file_writer(
file_type, _state->exec_env(), {}, properties, file_name,
{.write_file_cache = false, .sync_file_data = false}));
}
// Factory creates either JNI or native transformer
RETURN_IF_ERROR(create_tvf_format_transformer(_tvf_sink, _state,
use_jni ? nullptr : _file_writer_impl.get(),
_vec_output_expr_ctxs, &_vfile_writer));
VLOG_DEBUG << "TVF table writer created file: " << file_name
<< ", format=" << _tvf_sink.file_format << ", use_jni=" << use_jni
<< ", query_id=" << print_id(_state->query_id());
return _vfile_writer->open();
}
Status VTVFTableWriter::_create_next_file_writer() {
std::string file_name;
RETURN_IF_ERROR(_get_next_file_name(&file_name));
return _create_file_writer(file_name);
}
Status VTVFTableWriter::_close_file_writer(bool done) {
if (_vfile_writer) {
RETURN_IF_ERROR(_vfile_writer->close());
COUNTER_UPDATE(_written_data_bytes, _vfile_writer->written_len());
_vfile_writer.reset(nullptr);
} else if (_file_writer_impl && _file_writer_impl->state() != io::FileWriter::State::CLOSED) {
RETURN_IF_ERROR(_file_writer_impl->close());
}
if (!done) {
RETURN_IF_ERROR(_create_next_file_writer());
}
return Status::OK();
}
Status VTVFTableWriter::_create_new_file_if_exceed_size() {
if (_max_file_size_bytes <= 0 || _current_written_bytes < _max_file_size_bytes) {
return Status::OK();
}
SCOPED_TIMER(_writer_close_timer);
RETURN_IF_ERROR(_close_file_writer(false));
_current_written_bytes = 0;
return Status::OK();
}
Status VTVFTableWriter::_get_next_file_name(std::string* file_name) {
std::string ext;
switch (_tvf_sink.file_format) {
case TFileFormatType::FORMAT_CSV_PLAIN:
ext = "csv";
break;
case TFileFormatType::FORMAT_PARQUET:
ext = "parquet";
break;
case TFileFormatType::FORMAT_ORC:
ext = "orc";
break;
default:
ext = "dat";
break;
}
// file_path is a prefix, generate: {prefix}{query_id}_{idx}.{ext}
std::string query_id_str = print_id(_state->query_id());
*file_name = fmt::format("{}{}_{}.{}", _file_path, query_id_str, _file_idx, ext);
_file_idx++;
return Status::OK();
}
} // namespace doris