blob: 0861af2b806e89da9fd0502bc6cbc090656776dd [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <gen_cpp/BackendService_types.h>
#include <gen_cpp/FrontendService_types.h>
#include <gen_cpp/PlanNodes_types.h>
#include <gen_cpp/Types_types.h>
#include <stddef.h>
#include <stdint.h>
#include <condition_variable>
#include <future>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "common/utils.h"
#include "runtime/exec_env.h"
#include "runtime/stream_load/stream_load_executor.h"
#include "runtime/thread_context.h"
#include "util/byte_buffer.h"
#include "util/time.h"
#include "util/uid_util.h"
namespace doris {
namespace io {
class StreamLoadPipe;
} // namespace io
// kafka related info
class KafkaLoadInfo {
public:
KafkaLoadInfo(const TKafkaLoadInfo& t_info)
: brokers(t_info.brokers),
topic(t_info.topic),
begin_offset(t_info.partition_begin_offset),
properties(t_info.properties) {
// The offset(begin_offset) sent from FE is the starting offset,
// and the offset(cmt_offset) reported by BE to FE is the consumed offset,
// so we need to minus 1 here.
for (auto& p : t_info.partition_begin_offset) {
cmt_offset[p.first] = p.second - 1;
}
}
void reset_offset() {
// reset the commit offset
for (auto& p : begin_offset) {
cmt_offset[p.first] = p.second - 1;
}
}
public:
std::string brokers;
std::string topic;
// the following members control the max progress of a consuming
// process. if any of them reach, the consuming will finish.
int64_t max_interval_s = 5;
int64_t max_batch_rows = 1024;
int64_t max_batch_size = 100 * 1024 * 1024; // 100MB
// partition -> begin offset, inclusive.
std::map<int32_t, int64_t> begin_offset;
// partition -> commit offset, inclusive.
std::map<int32_t, int64_t> cmt_offset;
//custom kafka property key -> value
std::map<std::string, std::string> properties;
};
class MessageBodySink;
class StreamLoadContext {
ENABLE_FACTORY_CREATOR(StreamLoadContext);
public:
StreamLoadContext(ExecEnv* exec_env) : id(UniqueId::gen_uid()), _exec_env(exec_env) {
start_millis = UnixMillis();
}
~StreamLoadContext() {
if (need_rollback) {
_exec_env->stream_load_executor()->rollback_txn(this);
need_rollback = false;
}
}
std::string data_saved_path;
std::string to_json() const;
std::string prepare_stream_load_record(const std::string& stream_load_record);
static void parse_stream_load_record(const std::string& stream_load_record,
TStreamLoadRecord& stream_load_item);
// the old mini load result format is not same as stream load.
// add this function for compatible with old mini load result format.
std::string to_json_for_mini_load() const;
// return the brief info of this context.
// also print the load source info if detail is set to true
std::string brief(bool detail = false) const;
bool is_mow_table() const;
Status allocate_schema_buffer() {
if (_schema_buffer == nullptr) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->stream_load_pipe_tracker());
return ByteBuffer::allocate(config::stream_tvf_buffer_size, &_schema_buffer);
}
return Status::OK();
}
ByteBufferPtr schema_buffer() { return _schema_buffer; }
public:
static const int default_txn_id = -1;
// load type, eg: ROUTINE LOAD/MANUAL LOAD
TLoadType::type load_type = TLoadType::type::MANUL_LOAD;
// load data source: eg: KAFKA/RAW
TLoadSourceType::type load_src_type;
// the job this stream load task belongs to,
// set to -1 if there is no job
int64_t job_id = -1;
// id for each load
UniqueId id;
std::string db;
int64_t db_id = -1;
int64_t wal_id = -1;
std::string table;
int64_t table_id = -1;
int64_t schema_version = -1;
std::string label;
std::string sql_str;
// optional
std::string sub_label;
double max_filter_ratio = 0.0;
int32_t timeout_second = -1;
AuthInfo auth;
bool two_phase_commit = false;
std::string load_comment;
// the following members control the max progress of a consuming
// process. if any of them reach, the consuming will finish.
// same as values set in fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
int64_t max_interval_s = 60;
int64_t max_batch_rows = 20000000;
int64_t max_batch_size = 1024 * 1024 * 1024; // 1GB
// for parse json-data
std::string data_format = "";
std::string jsonpath_file = "";
std::string jsonpath = "";
// only used to check if we receive whole body
size_t body_bytes = 0;
size_t receive_bytes = 0;
bool is_chunked_transfer = false;
int64_t txn_id = default_txn_id;
// http stream
bool is_read_schema = true;
std::string txn_operation = "";
bool need_rollback = false;
// when use_streaming is true, we use stream_pipe to send source data,
// otherwise we save source data to file first, then process it.
bool use_streaming = false;
TFileFormatType::type format = TFileFormatType::FORMAT_CSV_PLAIN;
TFileCompressType::type compress_type = TFileCompressType::UNKNOWN;
bool group_commit = false;
std::shared_ptr<MessageBodySink> body_sink;
std::shared_ptr<io::StreamLoadPipe> pipe;
TStreamLoadPutResult put_result;
TStreamLoadMultiTablePutResult multi_table_put_result;
std::vector<TTabletCommitInfo> commit_infos;
std::promise<Status> load_status_promise;
std::future<Status> load_status_future = load_status_promise.get_future();
Status status;
int64_t number_total_rows = 0;
int64_t number_loaded_rows = 0;
int64_t number_filtered_rows = 0;
int64_t number_unselected_rows = 0;
int64_t loaded_bytes = 0;
int64_t start_millis = 0;
int64_t start_write_data_nanos = 0;
int64_t load_cost_millis = 0;
int64_t begin_txn_cost_nanos = 0;
int64_t stream_load_put_cost_nanos = 0;
int64_t commit_and_publish_txn_cost_nanos = 0;
int64_t pre_commit_txn_cost_nanos = 0;
int64_t read_data_cost_nanos = 0;
int64_t write_data_cost_nanos = 0;
int64_t receive_and_read_data_cost_nanos = 0;
int64_t begin_receive_and_read_data_cost_nanos = 0;
std::string error_url = "";
std::string first_error_msg = "";
// if label already be used, set existing job's status here
// should be RUNNING or FINISHED
std::string existing_job_status = "";
std::unique_ptr<KafkaLoadInfo> kafka_info;
// consumer_id is used for data consumer cache key.
// to identified a specified data consumer.
int64_t consumer_id;
// If this is an transactional insert operation, this will be true
bool need_commit_self = false;
// csv with header type
std::string header_type = "";
// is this load single-stream-multi-table?
bool is_multi_table = false;
// for single-stream-multi-table, we have table list
std::vector<std::string> table_list;
bool memtable_on_sink_node = false;
// use for cloud cluster mode
std::string qualified_user;
std::string cloud_cluster;
// 1. _can_send_reply: Ensure `send_reply` is invoked only after on_header/handle complete,
// avoid client errors (e.g., broken pipe).
// 2. _finish_send_reply: Prevent duplicate reply sending; skip reply if HTTP request is canceled
// due to long import execution time.
std::mutex _send_reply_lock;
std::condition_variable _can_send_reply_cv;
bool _can_send_reply = false;
bool _finish_send_reply = false;
public:
ExecEnv* exec_env() { return _exec_env; }
private:
ExecEnv* _exec_env = nullptr;
ByteBufferPtr _schema_buffer;
};
} // namespace doris