| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <gen_cpp/PlanNodes_types.h> |
| #include <stdint.h> |
| |
| #include "operator.h" |
| #include "runtime/result_block_buffer.h" |
| #include "runtime/result_writer.h" |
| |
| namespace doris { |
| #include "common/compile_check_begin.h" |
| class ResultBlockBufferBase; |
| |
| namespace pipeline { |
| |
| struct ResultFileOptions { |
| // [[deprecated]] |
| bool is_local_file; |
| std::string file_path; |
| TFileFormatType::type file_format; |
| std::string column_separator; |
| std::string line_delimiter; |
| size_t max_file_size_bytes = 1 * 1024 * 1024 * 1024; // 1GB |
| std::vector<TNetworkAddress> broker_addresses; |
| std::map<std::string, std::string> broker_properties; |
| std::string success_file_name; |
| std::vector<std::vector<std::string>> schema; //not use in outfile with parquet format |
| std::map<std::string, std::string> file_properties; //not use in outfile with parquet format |
| |
| std::vector<TParquetSchema> parquet_schemas; |
| TParquetCompressionType::type parquet_commpression_type; |
| TParquetVersion::type parquet_version; |
| bool parquert_disable_dictionary = false; |
| bool enable_int96_timestamps = false; |
| //note: use outfile with parquet format, have deprecated 9:schema and 10:file_properties |
| //But in order to consider the compatibility when upgrading, so add a bool to check |
| //Now the code version is 1.1.2, so when the version is after 1.2, could remove this code. |
| bool is_refactor_before_flag = false; |
| std::string orc_schema; |
| TFileCompressType::type orc_compression_type; |
| // currently only for csv |
| // TODO: we should merge parquet_commpression_type/orc_compression_type/compression_type |
| TFileCompressType::type compression_type = TFileCompressType::PLAIN; |
| |
| bool delete_existing_files = false; |
| std::string file_suffix; |
| //Bring BOM when exporting to CSV format |
| bool with_bom = false; |
| int64_t orc_writer_version = 0; |
| |
| ResultFileOptions(const TResultFileSinkOptions& t_opt) { |
| file_path = t_opt.file_path; |
| file_format = t_opt.file_format; |
| column_separator = t_opt.__isset.column_separator ? t_opt.column_separator : "\t"; |
| line_delimiter = t_opt.__isset.line_delimiter ? t_opt.line_delimiter : "\n"; |
| max_file_size_bytes = |
| t_opt.__isset.max_file_size_bytes ? t_opt.max_file_size_bytes : max_file_size_bytes; |
| delete_existing_files = |
| t_opt.__isset.delete_existing_files ? t_opt.delete_existing_files : false; |
| file_suffix = t_opt.file_suffix; |
| with_bom = t_opt.with_bom; |
| |
| is_local_file = true; |
| if (t_opt.__isset.broker_addresses) { |
| broker_addresses = t_opt.broker_addresses; |
| is_local_file = false; |
| } |
| if (t_opt.__isset.broker_properties) { |
| broker_properties = t_opt.broker_properties; |
| } |
| if (t_opt.__isset.success_file_name) { |
| success_file_name = t_opt.success_file_name; |
| } |
| if (t_opt.__isset.schema) { |
| schema = t_opt.schema; |
| is_refactor_before_flag = true; |
| } |
| if (t_opt.__isset.file_properties) { |
| file_properties = t_opt.file_properties; |
| } |
| if (t_opt.__isset.parquet_schemas) { |
| is_refactor_before_flag = false; |
| parquet_schemas = t_opt.parquet_schemas; |
| } |
| if (t_opt.__isset.parquet_compression_type) { |
| parquet_commpression_type = t_opt.parquet_compression_type; |
| } |
| if (t_opt.__isset.parquet_disable_dictionary) { |
| parquert_disable_dictionary = t_opt.parquet_disable_dictionary; |
| } |
| if (t_opt.__isset.parquet_version) { |
| parquet_version = t_opt.parquet_version; |
| } |
| if (t_opt.__isset.enable_int96_timestamps) { |
| enable_int96_timestamps = t_opt.enable_int96_timestamps; |
| } |
| if (t_opt.__isset.orc_schema) { |
| orc_schema = t_opt.orc_schema; |
| } |
| if (t_opt.__isset.orc_compression_type) { |
| orc_compression_type = t_opt.orc_compression_type; |
| } |
| if (t_opt.__isset.orc_writer_version) { |
| orc_writer_version = t_opt.orc_writer_version; |
| } |
| if (t_opt.__isset.compression_type) { |
| compression_type = t_opt.compression_type; |
| } |
| } |
| }; |
| |
| constexpr int RESULT_SINK_BUFFER_SIZE = 4096 * 8; |
| |
| class ResultSinkLocalState final : public PipelineXSinkLocalState<BasicSharedState> { |
| ENABLE_FACTORY_CREATOR(ResultSinkLocalState); |
| using Base = PipelineXSinkLocalState<BasicSharedState>; |
| |
| public: |
| ResultSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) |
| : Base(parent, state) {} |
| |
| Status init(RuntimeState* state, LocalSinkStateInfo& info) override; |
| Status open(RuntimeState* state) override; |
| Status close(RuntimeState* state, Status exec_status) override; |
| |
| private: |
| friend class ResultSinkOperatorX; |
| |
| vectorized::VExprContextSPtrs _output_vexpr_ctxs; |
| |
| std::shared_ptr<ResultBlockBufferBase> _sender = nullptr; |
| std::shared_ptr<ResultWriter> _writer = nullptr; |
| |
| RuntimeProfile::Counter* _fetch_row_id_timer = nullptr; |
| RuntimeProfile::Counter* _write_data_timer = nullptr; |
| }; |
| |
| class ResultSinkOperatorX final : public DataSinkOperatorX<ResultSinkLocalState> { |
| public: |
| ResultSinkOperatorX(int operator_id, const RowDescriptor& row_desc, |
| const std::vector<TExpr>& select_exprs, const TResultSink& sink); |
| Status prepare(RuntimeState* state) override; |
| |
| Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; |
| |
| private: |
| friend class ResultSinkLocalState; |
| |
| Status _second_phase_fetch_data(RuntimeState* state, vectorized::Block* final_block); |
| const TResultSinkType::type _sink_type; |
| const int _result_sink_buffer_size_rows; |
| // set file options when sink type is FILE |
| std::unique_ptr<ResultFileOptions> _file_opts = nullptr; |
| |
| // Owned by the RuntimeState. |
| const RowDescriptor& _row_desc; |
| |
| // Owned by the RuntimeState. |
| const std::vector<TExpr>& _t_output_expr; |
| vectorized::VExprContextSPtrs _output_vexpr_ctxs; |
| |
| // for fetch data by rowids |
| const TFetchOption _fetch_option; |
| |
| std::shared_ptr<ResultBlockBufferBase> _sender = nullptr; |
| }; |
| |
| } // namespace pipeline |
| #include "common/compile_check_end.h" |
| } // namespace doris |