| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "result_file_sink_operator.h" |
| |
| #include <memory> |
| #include <random> |
| |
| #include "pipeline/exec/exchange_sink_buffer.h" |
| #include "pipeline/exec/operator.h" |
| #include "pipeline/exec/result_sink_operator.h" |
| #include "runtime/result_block_buffer.h" |
| #include "runtime/result_buffer_mgr.h" |
| #include "vec/sink/vdata_stream_sender.h" |
| |
| namespace doris::pipeline { |
| #include "common/compile_check_begin.h" |
| |
| ResultFileSinkLocalState::ResultFileSinkLocalState(DataSinkOperatorXBase* parent, |
| RuntimeState* state) |
| : AsyncWriterSink<vectorized::VFileResultWriter, ResultFileSinkOperatorX>(parent, state) {} |
| |
| ResultFileSinkOperatorX::ResultFileSinkOperatorX(int operator_id, const RowDescriptor& row_desc, |
| const std::vector<TExpr>& t_output_expr) |
| : DataSinkOperatorX(operator_id, std::numeric_limits<int>::max(), |
| std::numeric_limits<int>::max()), |
| _row_desc(row_desc), |
| _t_output_expr(t_output_expr) {} |
| |
| ResultFileSinkOperatorX::ResultFileSinkOperatorX( |
| int operator_id, const RowDescriptor& row_desc, const TResultFileSink& sink, |
| const std::vector<TPlanFragmentDestination>& destinations, |
| const std::vector<TExpr>& t_output_expr, DescriptorTbl& descs) |
| : DataSinkOperatorX(operator_id, std::numeric_limits<int>::max(), |
| std::numeric_limits<int>::max()), |
| _row_desc(row_desc), |
| _t_output_expr(t_output_expr), |
| _dests(destinations), |
| _output_row_descriptor(descs.get_tuple_descriptor(sink.output_tuple_id)) { |
| CHECK_EQ(destinations.size(), 1); |
| } |
| |
| ResultFileSinkLocalState::~ResultFileSinkLocalState() = default; |
| |
| Status ResultFileSinkOperatorX::init(const TDataSink& tsink) { |
| RETURN_IF_ERROR(DataSinkOperatorX<ResultFileSinkLocalState>::init(tsink)); |
| const auto& sink = tsink.result_file_sink; |
| CHECK(sink.__isset.file_options); |
| _file_opts = std::make_unique<ResultFileOptions>(sink.file_options); |
| CHECK(sink.__isset.storage_backend_type); |
| _storage_type = sink.storage_backend_type; |
| |
| //for impl csv_with_name and csv_with_names_and_types |
| _header_type = sink.header_type; |
| _header = sink.header; |
| |
| // From the thrift expressions create the real exprs. |
| RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs)); |
| return Status::OK(); |
| } |
| |
| Status ResultFileSinkOperatorX::prepare(RuntimeState* state) { |
| RETURN_IF_ERROR(DataSinkOperatorX<ResultFileSinkLocalState>::prepare(state)); |
| RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc)); |
| if (state->query_options().enable_parallel_outfile) { |
| RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( |
| state->query_id(), _buf_size, &_sender, state, false, nullptr)); |
| } |
| return vectorized::VExpr::open(_output_vexpr_ctxs, state); |
| } |
| |
| Status ResultFileSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { |
| RETURN_IF_ERROR(Base::init(state, info)); |
| SCOPED_TIMER(exec_time_counter()); |
| SCOPED_TIMER(_init_timer); |
| _sender_id = info.sender_id; |
| |
| auto& p = _parent->cast<ResultFileSinkOperatorX>(); |
| CHECK(p._file_opts.get() != nullptr); |
| // create sender |
| if (state->query_options().enable_parallel_outfile) { |
| _sender = _parent->cast<ResultFileSinkOperatorX>()._sender; |
| } else { |
| RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( |
| state->fragment_instance_id(), p._buf_size, &_sender, state, false, nullptr)); |
| } |
| _sender->set_dependency(state->fragment_instance_id(), _dependency->shared_from_this()); |
| |
| // create writer |
| _writer.reset(new (std::nothrow) vectorized::VFileResultWriter( |
| p._file_opts.get(), p._storage_type, state->fragment_instance_id(), _output_vexpr_ctxs, |
| _sender, nullptr, state->return_object_data_as_binary(), p._output_row_descriptor, |
| _async_writer_dependency, _finish_dependency)); |
| _writer->set_header_info(p._header_type, p._header); |
| return Status::OK(); |
| } |
| |
| Status ResultFileSinkLocalState::close(RuntimeState* state, Status exec_status) { |
| if (Base::_closed) { |
| return Status::OK(); |
| } |
| SCOPED_TIMER(_close_timer); |
| SCOPED_TIMER(exec_time_counter()); |
| |
| if (_closed) { |
| return Status::OK(); |
| } |
| |
| Status final_status = exec_status; |
| // For pipelinex engine, the writer is closed in async thread process_block |
| if (_writer) { |
| Status st = _writer->get_writer_status(); |
| if (!st.ok() && exec_status.ok()) { |
| // close file writer failed, should return this error to client |
| final_status = st; |
| } |
| } |
| // close sender, this is normal path end |
| if (_sender) { |
| int64_t written_rows = _writer == nullptr ? 0 : _writer->get_written_rows(); |
| state->get_query_ctx()->resource_ctx()->io_context()->update_returned_rows(written_rows); |
| RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), final_status, written_rows)); |
| } |
| state->exec_env()->result_mgr()->cancel_at_time( |
| time(nullptr) + config::result_buffer_cancelled_interval_time, |
| state->fragment_instance_id()); |
| |
| return Base::close(state, exec_status); |
| } |
| |
| Status ResultFileSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, bool eos) { |
| auto& local_state = get_local_state(state); |
| SCOPED_TIMER(local_state.exec_time_counter()); |
| COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); |
| return local_state.sink(state, in_block, eos); |
| } |
| |
| #include "common/compile_check_end.h" |
| } // namespace doris::pipeline |