| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <gen_cpp/Types_types.h> |
| #include <stddef.h> |
| #include <stdint.h> |
| |
| #include <cstdint> |
| #include <iosfwd> |
| #include <memory> |
| #include <string> |
| #include <vector> |
| |
| #include "common/status.h" |
| #include "io/fs/file_writer.h" |
| #include "runtime/descriptors.h" |
| #include "runtime/result_block_buffer.h" |
| #include "util/runtime_profile.h" |
| #include "vec/core/block.h" |
| #include "vec/runtime/vfile_format_transformer.h" |
| #include "vec/sink/writer/async_result_writer.h" |
| |
| namespace doris { |
| class ResultBlockBufferBase; |
| class RuntimeState; |
| |
| namespace vectorized { |
| class GetResultBatchCtx; |
| using MySQLResultBlockBuffer = ResultBlockBuffer<GetResultBatchCtx>; |
| class VExprContext; |
| } // namespace vectorized |
| namespace pipeline { |
| struct ResultFileOptions; |
| } |
| } // namespace doris |
| |
| namespace doris::vectorized { |
| |
| // write result to file |
| class VFileResultWriter final : public AsyncResultWriter { |
| public: |
| VFileResultWriter(const pipeline::ResultFileOptions* file_option, |
| const TStorageBackendType::type storage_type, |
| const TUniqueId fragment_instance_id, |
| const VExprContextSPtrs& _output_vexpr_ctxs, |
| std::shared_ptr<ResultBlockBufferBase> sinker, Block* output_block, |
| bool output_object_data, const RowDescriptor& output_row_descriptor, |
| std::shared_ptr<pipeline::Dependency> dep, |
| std::shared_ptr<pipeline::Dependency> fin_dep); |
| |
| VFileResultWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs, |
| std::shared_ptr<pipeline::Dependency> dep, |
| std::shared_ptr<pipeline::Dependency> fin_dep); |
| |
| Status write(RuntimeState* state, Block& block) override; |
| |
| Status close(Status exec_status) override; |
| |
| Status open(RuntimeState* state, RuntimeProfile* profile) override; |
| |
| // file result writer always return statistic result in one row |
| int64_t get_written_rows() const override { return 1; } |
| |
| void set_header_info(const std::string& header_type, const std::string& header) { |
| _header_type = header_type; |
| _header = header; |
| } |
| |
| private: |
| Status _write_file(const Block& block); |
| |
| void _init_profile(RuntimeProfile*); |
| |
| Status _create_file_writer(const std::string& file_name); |
| Status _create_next_file_writer(); |
| // get next export file name |
| Status _get_next_file_name(std::string* file_name); |
| void _get_file_url(std::string* file_url); |
| std::string _file_format_to_name(); |
| // close file writer, and if !done, it will create new writer for next file. |
| Status _close_file_writer(bool done); |
| // create a new file if current file size exceed limit |
| Status _create_new_file_if_exceed_size(); |
| // send the final statistic result |
| Status _send_result(); |
| // save result into batch rather than send it |
| Status _fill_result_block(); |
| // delete the dir of file_path |
| Status _delete_dir(); |
| double _get_write_speed(int64_t write_bytes, int64_t write_time); |
| std::string _compression_type_to_name(); |
| |
| private: |
| RuntimeState* _state; // not owned, set when init |
| const pipeline::ResultFileOptions* _file_opts = nullptr; |
| TStorageBackendType::type _storage_type; |
| TUniqueId _fragment_instance_id; |
| |
| // If the result file format is plain text, like CSV, this _file_writer is owned by this FileResultWriter. |
| // If the result file format is Parquet, this _file_writer is owned by _parquet_writer. |
| std::unique_ptr<doris::io::FileWriter> _file_writer_impl; |
| // Used to buffer the export data of plain text |
| // TODO(cmy): I simply use a stringstrteam to buffer the data, to avoid calling |
| // file writer's write() for every single row. |
| // But this cannot solve the problem of a row of data that is too large. |
| // For example: bitmap_to_string() may return large volume of data. |
| // And the speed is relative low, in my test, is about 6.5MB/s. |
| std::stringstream _plain_text_outstream; |
| |
| // current written bytes, used for split data |
| int64_t _current_written_bytes = 0; |
| // the suffix idx of export file name, start at 0 |
| int _file_idx = 0; |
| |
| // total time cost on append batch operation |
| RuntimeProfile::Counter* _append_row_batch_timer = nullptr; |
| // tuple convert timer, child timer of _append_row_batch_timer |
| RuntimeProfile::Counter* _convert_tuple_timer = nullptr; |
| // file write timer, child timer of _append_row_batch_timer |
| RuntimeProfile::Counter* _file_write_timer = nullptr; |
| // time of closing the file writer |
| RuntimeProfile::Counter* _writer_close_timer = nullptr; |
| // number of written rows |
| RuntimeProfile::Counter* _written_rows_counter = nullptr; |
| // bytes of written data |
| RuntimeProfile::Counter* _written_data_bytes = nullptr; |
| |
| // _sinker and _output_batch are not owned by FileResultWriter |
| std::shared_ptr<MySQLResultBlockBuffer> _sinker = nullptr; |
| Block* _output_block = nullptr; |
| // set to true if the final statistic result is sent |
| bool _is_result_sent = false; |
| RowDescriptor _output_row_descriptor; |
| // convert block to parquet/orc/csv fomrat |
| std::unique_ptr<VFileFormatTransformer> _vfile_writer; |
| |
| std::string_view _header_type; |
| std::string_view _header; |
| }; |
| } // namespace doris::vectorized |