| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <gen_cpp/BackendService_types.h> |
| #include <gen_cpp/FrontendService_types.h> |
| #include <gen_cpp/PlanNodes_types.h> |
| #include <gen_cpp/Types_types.h> |
| #include <stddef.h> |
| #include <stdint.h> |
| |
| #include <condition_variable> |
| #include <future> |
| #include <map> |
| #include <memory> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include "common/config.h" |
| #include "common/logging.h" |
| #include "common/status.h" |
| #include "common/utils.h" |
| #include "runtime/exec_env.h" |
| #include "runtime/stream_load/stream_load_executor.h" |
| #include "runtime/thread_context.h" |
| #include "util/byte_buffer.h" |
| #include "util/time.h" |
| #include "util/uid_util.h" |
| |
| namespace doris { |
| namespace io { |
| class StreamLoadPipe; |
| } // namespace io |
| |
| // kafka related info |
| class KafkaLoadInfo { |
| public: |
| KafkaLoadInfo(const TKafkaLoadInfo& t_info) |
| : brokers(t_info.brokers), |
| topic(t_info.topic), |
| begin_offset(t_info.partition_begin_offset), |
| properties(t_info.properties) { |
| // The offset(begin_offset) sent from FE is the starting offset, |
| // and the offset(cmt_offset) reported by BE to FE is the consumed offset, |
| // so we need to minus 1 here. |
| for (auto& p : t_info.partition_begin_offset) { |
| cmt_offset[p.first] = p.second - 1; |
| } |
| } |
| |
| void reset_offset() { |
| // reset the commit offset |
| for (auto& p : begin_offset) { |
| cmt_offset[p.first] = p.second - 1; |
| } |
| } |
| |
| public: |
| std::string brokers; |
| std::string topic; |
| |
| // the following members control the max progress of a consuming |
| // process. if any of them reach, the consuming will finish. |
| int64_t max_interval_s = 5; |
| int64_t max_batch_rows = 1024; |
| int64_t max_batch_size = 100 * 1024 * 1024; // 100MB |
| |
| // partition -> begin offset, inclusive. |
| std::map<int32_t, int64_t> begin_offset; |
| // partition -> commit offset, inclusive. |
| std::map<int32_t, int64_t> cmt_offset; |
| //custom kafka property key -> value |
| std::map<std::string, std::string> properties; |
| }; |
| |
| class MessageBodySink; |
| |
| class StreamLoadContext { |
| ENABLE_FACTORY_CREATOR(StreamLoadContext); |
| |
| public: |
| StreamLoadContext(ExecEnv* exec_env) : id(UniqueId::gen_uid()), _exec_env(exec_env) { |
| start_millis = UnixMillis(); |
| } |
| |
| ~StreamLoadContext() { |
| if (need_rollback) { |
| _exec_env->stream_load_executor()->rollback_txn(this); |
| need_rollback = false; |
| } |
| } |
| |
| std::string data_saved_path; |
| |
| std::string to_json() const; |
| |
| std::string prepare_stream_load_record(const std::string& stream_load_record); |
| static void parse_stream_load_record(const std::string& stream_load_record, |
| TStreamLoadRecord& stream_load_item); |
| |
| // the old mini load result format is not same as stream load. |
| // add this function for compatible with old mini load result format. |
| std::string to_json_for_mini_load() const; |
| |
| // return the brief info of this context. |
| // also print the load source info if detail is set to true |
| std::string brief(bool detail = false) const; |
| |
| bool is_mow_table() const; |
| |
| Status allocate_schema_buffer() { |
| if (_schema_buffer == nullptr) { |
| SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
| ExecEnv::GetInstance()->stream_load_pipe_tracker()); |
| return ByteBuffer::allocate(config::stream_tvf_buffer_size, &_schema_buffer); |
| } |
| return Status::OK(); |
| } |
| |
| ByteBufferPtr schema_buffer() { return _schema_buffer; } |
| |
| public: |
| static const int default_txn_id = -1; |
| // load type, eg: ROUTINE LOAD/MANUAL LOAD |
| TLoadType::type load_type = TLoadType::type::MANUL_LOAD; |
| // load data source: eg: KAFKA/RAW |
| TLoadSourceType::type load_src_type; |
| |
| // the job this stream load task belongs to, |
| // set to -1 if there is no job |
| int64_t job_id = -1; |
| |
| // id for each load |
| UniqueId id; |
| |
| std::string db; |
| int64_t db_id = -1; |
| int64_t wal_id = -1; |
| std::string table; |
| int64_t table_id = -1; |
| int64_t schema_version = -1; |
| std::string label; |
| std::string sql_str; |
| // optional |
| std::string sub_label; |
| double max_filter_ratio = 0.0; |
| int32_t timeout_second = -1; |
| AuthInfo auth; |
| bool two_phase_commit = false; |
| std::string load_comment; |
| |
| // the following members control the max progress of a consuming |
| // process. if any of them reach, the consuming will finish. |
| // same as values set in fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java |
| int64_t max_interval_s = 60; |
| int64_t max_batch_rows = 20000000; |
| int64_t max_batch_size = 1024 * 1024 * 1024; // 1GB |
| |
| // for parse json-data |
| std::string data_format = ""; |
| std::string jsonpath_file = ""; |
| std::string jsonpath = ""; |
| |
| // only used to check if we receive whole body |
| size_t body_bytes = 0; |
| size_t receive_bytes = 0; |
| bool is_chunked_transfer = false; |
| |
| int64_t txn_id = default_txn_id; |
| |
| // http stream |
| bool is_read_schema = true; |
| |
| std::string txn_operation = ""; |
| |
| bool need_rollback = false; |
| // when use_streaming is true, we use stream_pipe to send source data, |
| // otherwise we save source data to file first, then process it. |
| bool use_streaming = false; |
| TFileFormatType::type format = TFileFormatType::FORMAT_CSV_PLAIN; |
| TFileCompressType::type compress_type = TFileCompressType::UNKNOWN; |
| bool group_commit = false; |
| |
| std::shared_ptr<MessageBodySink> body_sink; |
| std::shared_ptr<io::StreamLoadPipe> pipe; |
| |
| TStreamLoadPutResult put_result; |
| TStreamLoadMultiTablePutResult multi_table_put_result; |
| |
| std::vector<TTabletCommitInfo> commit_infos; |
| |
| std::promise<Status> load_status_promise; |
| std::future<Status> load_status_future = load_status_promise.get_future(); |
| |
| Status status; |
| |
| int64_t number_total_rows = 0; |
| int64_t number_loaded_rows = 0; |
| int64_t number_filtered_rows = 0; |
| int64_t number_unselected_rows = 0; |
| int64_t loaded_bytes = 0; |
| int64_t start_millis = 0; |
| int64_t start_write_data_nanos = 0; |
| int64_t load_cost_millis = 0; |
| int64_t begin_txn_cost_nanos = 0; |
| int64_t stream_load_put_cost_nanos = 0; |
| int64_t commit_and_publish_txn_cost_nanos = 0; |
| int64_t pre_commit_txn_cost_nanos = 0; |
| int64_t read_data_cost_nanos = 0; |
| int64_t write_data_cost_nanos = 0; |
| int64_t receive_and_read_data_cost_nanos = 0; |
| int64_t begin_receive_and_read_data_cost_nanos = 0; |
| |
| std::string error_url = ""; |
| std::string first_error_msg = ""; |
| // if label already be used, set existing job's status here |
| // should be RUNNING or FINISHED |
| std::string existing_job_status = ""; |
| |
| std::unique_ptr<KafkaLoadInfo> kafka_info; |
| |
| // consumer_id is used for data consumer cache key. |
| // to identified a specified data consumer. |
| int64_t consumer_id; |
| |
| // If this is an transactional insert operation, this will be true |
| bool need_commit_self = false; |
| |
| // csv with header type |
| std::string header_type = ""; |
| |
| // is this load single-stream-multi-table? |
| bool is_multi_table = false; |
| |
| // for single-stream-multi-table, we have table list |
| std::vector<std::string> table_list; |
| |
| bool memtable_on_sink_node = false; |
| |
| // use for cloud cluster mode |
| std::string qualified_user; |
| std::string cloud_cluster; |
| |
| // 1. _can_send_reply: Ensure `send_reply` is invoked only after on_header/handle complete, |
| // avoid client errors (e.g., broken pipe). |
| // 2. _finish_send_reply: Prevent duplicate reply sending; skip reply if HTTP request is canceled |
| // due to long import execution time. |
| std::mutex _send_reply_lock; |
| std::condition_variable _can_send_reply_cv; |
| bool _can_send_reply = false; |
| bool _finish_send_reply = false; |
| |
| public: |
| ExecEnv* exec_env() { return _exec_env; } |
| |
| private: |
| ExecEnv* _exec_env = nullptr; |
| ByteBufferPtr _schema_buffer; |
| }; |
| |
| } // namespace doris |