blob: 4dfe2e252d9e72d5e6aa618077a9b74bc4e5bdab [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <brpc/controller.h>
#include <bthread/condition_variable.h>
#include <bthread/mutex.h>
#include <bthread/types.h>
#include <butil/errno.h>
#include <fmt/format.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/internal_service.pb.h>
#include <gen_cpp/types.pb.h>
#include <glog/logging.h>
#include <google/protobuf/stubs/callback.h>
#include <parallel_hashmap/phmap.h>
#include <stddef.h>
#include <stdint.h>
#include <atomic>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <functional>
#include <initializer_list>
#include <map>
#include <memory>
#include <mutex>
#include <ostream>
#include <queue>
#include <set>
#include <span>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include "common/config.h"
#include "common/status.h"
#include "core/allocator.h"
#include "core/block/block.h"
#include "core/column/column.h"
#include "core/data_type/data_type.h"
#include "exprs/vexpr_fwd.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/runtime_profile.h"
#include "runtime/thread_context.h"
#include "storage/tablet_info.h"
#include "util/countdown_latch.h"
#include "util/debug_points.h"
#include "util/stopwatch.hpp"
namespace doris {
#include "common/compile_check_begin.h"
class TabletSchema;
class LoadStreamStub;
struct SegmentStatistics;
using IndexToTabletSchema = phmap::parallel_flat_hash_map<
int64_t, std::shared_ptr<TabletSchema>, std::hash<int64_t>, std::equal_to<int64_t>,
std::allocator<phmap::Pair<const int64_t, std::shared_ptr<TabletSchema>>>, 4, std::mutex>;
using IndexToEnableMoW =
phmap::parallel_flat_hash_map<int64_t, bool, std::hash<int64_t>, std::equal_to<int64_t>,
std::allocator<phmap::Pair<const int64_t, bool>>, 4,
std::mutex>;
class LoadStreamReplyHandler : public brpc::StreamInputHandler {
public:
LoadStreamReplyHandler(PUniqueId load_id, int64_t dst_id, std::weak_ptr<LoadStreamStub> stub)
: _load_id(load_id), _dst_id(dst_id), _stub(stub) {}
int on_received_messages(brpc::StreamId id, butil::IOBuf* const messages[],
size_t size) override;
void on_idle_timeout(brpc::StreamId id) override {}
void on_closed(brpc::StreamId id) override;
friend std::ostream& operator<<(std::ostream& ostr, const LoadStreamReplyHandler& handler);
private:
PUniqueId _load_id; // for logging
int64_t _dst_id = -1; // for logging
std::weak_ptr<LoadStreamStub> _stub;
};
class LoadStreamStub : public std::enable_shared_from_this<LoadStreamStub> {
friend class LoadStreamReplyHandler;
public:
// construct new stub
LoadStreamStub(PUniqueId load_id, int64_t src_id,
std::shared_ptr<IndexToTabletSchema> schema_map,
std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental = false);
LoadStreamStub(UniqueId load_id, int64_t src_id,
std::shared_ptr<IndexToTabletSchema> schema_map,
std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental = false)
: LoadStreamStub(load_id.to_proto(), src_id, schema_map, mow_map, incremental) {};
// for mock this class in UT
#ifdef BE_TEST
virtual
#endif
~LoadStreamStub();
// open_load_stream
Status open(BrpcClientCache<PBackendService_Stub>* client_cache, const NodeInfo& node_info,
int64_t txn_id, const OlapTableSchemaParam& schema,
const std::vector<PTabletID>& tablets_for_schema, int total_streams,
int64_t idle_timeout_ms, bool enable_profile);
// for mock this class in UT
#ifdef BE_TEST
virtual
#endif
// segment_id is limited by max_segment_num_per_rowset (default value of 1000),
// so in practice it will not exceed the range of i16.
// APPEND_DATA
Status
append_data(int64_t partition_id, int64_t index_id, int64_t tablet_id,
int32_t segment_id, uint64_t offset, std::span<const Slice> data,
bool segment_eos = false, FileType file_type = FileType::SEGMENT_FILE);
// ADD_SEGMENT
Status add_segment(int64_t partition_id, int64_t index_id, int64_t tablet_id,
int32_t segment_id, const SegmentStatistics& segment_stat);
// CLOSE_LOAD
Status close_load(const std::vector<PTabletID>& tablets_to_commit, int num_incremental_streams);
// GET_SCHEMA
Status get_schema(const std::vector<PTabletID>& tablets);
// wait remote to close stream,
// remote will close stream when it receives CLOSE_LOAD
Status close_finish_check(RuntimeState* state, bool* is_closed);
// cancel the stream, abort close_wait, mark _is_closed and _is_cancelled
void cancel(Status reason);
Status wait_for_schema(int64_t partition_id, int64_t index_id, int64_t tablet_id,
int64_t timeout_ms = 60000);
Status wait_for_new_schema(int64_t timeout_ms) {
std::unique_lock<bthread::Mutex> lock(_schema_mutex);
if (timeout_ms > 0) {
int ret = _schema_cv.wait_for(lock, timeout_ms * 1000);
return ret == 0 ? Status::OK() : Status::Error<true>(ret, "wait schema update timeout");
}
_schema_cv.wait(lock);
return Status::OK();
};
std::shared_ptr<TabletSchema> tablet_schema(int64_t index_id) const {
return (*_tablet_schema_for_index)[index_id];
}
bool enable_unique_mow(int64_t index_id) const {
return _enable_unique_mow_for_index->at(index_id);
}
std::vector<int64_t> success_tablets() {
std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex);
return _success_tablets;
}
std::unordered_map<int64_t, Status> failed_tablets() {
std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex);
return _failed_tablets;
}
brpc::StreamId stream_id() const { return _stream_id; }
int64_t src_id() const { return _src_id; }
int64_t dst_id() const { return _dst_id; }
bool is_open() const { return _is_open.load(); }
bool is_incremental() const { return _is_incremental; }
friend std::ostream& operator<<(std::ostream& ostr, const LoadStreamStub& stub);
std::string to_string();
// for tests only
void add_success_tablet(int64_t tablet_id) {
std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex);
_success_tablets.push_back(tablet_id);
}
void add_failed_tablet(int64_t tablet_id, Status reason) {
std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex);
_failed_tablets[tablet_id] = reason;
}
void add_bytes_written(size_t bytes) {
std::lock_guard<bthread::Mutex> lock(_write_mutex);
_bytes_written += bytes;
}
int64_t bytes_written() {
std::lock_guard<bthread::Mutex> lock(_write_mutex);
return _bytes_written;
}
Status check_cancel() {
DBUG_EXECUTE_IF("LoadStreamStub._check_cancel.cancelled",
{ return Status::InternalError("stream cancelled"); });
if (!_is_cancelled.load()) {
return Status::OK();
}
std::lock_guard<bthread::Mutex> lock(_cancel_mutex);
return Status::Cancelled("load_id={}, reason: {}", print_id(_load_id),
_cancel_st.to_string_no_stack());
}
int64_t get_and_reset_load_back_pressure_version_wait_time_ms() {
return _load_back_pressure_version_wait_time_ms.exchange(0);
}
void _refresh_back_pressure_version_wait_time(
const ::google::protobuf::RepeatedPtrField<::doris::PTabletLoadRowsetInfo>&
tablet_load_infos);
private:
Status _encode_and_send(PStreamHeader& header, std::span<const Slice> data = {});
Status _send_with_buffer(butil::IOBuf& buf, bool sync = false);
Status _send_with_retry(butil::IOBuf& buf);
void _handle_failure(butil::IOBuf& buf, Status st);
protected:
std::atomic<bool> _is_init;
std::atomic<bool> _is_open;
std::atomic<bool> _is_closing;
std::atomic<bool> _is_closed;
std::atomic<bool> _is_cancelled;
std::atomic<bool> _is_eos;
PUniqueId _load_id;
brpc::StreamId _stream_id;
int64_t _src_id = -1; // source backend_id
int64_t _dst_id = -1; // destination backend_id
Status _status = Status::InternalError<false>("Stream is not open");
Status _cancel_st;
bthread::Mutex _open_mutex;
bthread::Mutex _cancel_mutex;
std::mutex _buffer_mutex;
std::mutex _send_mutex;
butil::IOBuf _buffer;
bthread::Mutex _schema_mutex;
bthread::ConditionVariable _schema_cv;
std::shared_ptr<IndexToTabletSchema> _tablet_schema_for_index;
std::shared_ptr<IndexToEnableMoW> _enable_unique_mow_for_index;
bthread::Mutex _success_tablets_mutex;
bthread::Mutex _failed_tablets_mutex;
std::vector<int64_t> _success_tablets;
std::unordered_map<int64_t, Status> _failed_tablets;
bool _is_incremental = false;
bthread::Mutex _write_mutex;
size_t _bytes_written = 0;
std::atomic<int64_t> _load_back_pressure_version_wait_time_ms {0};
};
// a collection of LoadStreams connect to the same node
class LoadStreamStubs {
public:
LoadStreamStubs(size_t num_streams, UniqueId load_id, int64_t src_id,
std::shared_ptr<IndexToTabletSchema> schema_map,
std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental = false)
: _is_incremental(incremental) {
_streams.reserve(num_streams);
for (size_t i = 0; i < num_streams; i++) {
_streams.emplace_back(
new LoadStreamStub(load_id, src_id, schema_map, mow_map, incremental));
}
}
Status open(BrpcClientCache<PBackendService_Stub>* client_cache, const NodeInfo& node_info,
int64_t txn_id, const OlapTableSchemaParam& schema,
const std::vector<PTabletID>& tablets_for_schema, int total_streams,
int64_t idle_timeout_ms, bool enable_profile);
bool is_incremental() const { return _is_incremental; }
size_t size() const { return _streams.size(); }
// for UT only
void mark_open() { _open_success.store(true); }
std::shared_ptr<LoadStreamStub> select_one_stream() {
if (!_open_success.load()) {
return nullptr;
}
size_t i = _select_index.fetch_add(1);
return _streams[i % _streams.size()];
}
void cancel(Status reason) {
for (auto& stream : _streams) {
stream->cancel(reason);
}
}
Status close_load(const std::vector<PTabletID>& tablets_to_commit, int num_incremental_streams);
std::unordered_set<int64_t> success_tablets() {
std::unordered_set<int64_t> s;
for (auto& stream : _streams) {
auto v = stream->success_tablets();
std::copy(v.begin(), v.end(), std::inserter(s, s.end()));
}
return s;
}
std::unordered_map<int64_t, Status> failed_tablets() {
std::unordered_map<int64_t, Status> m;
for (auto& stream : _streams) {
auto v = stream->failed_tablets();
m.insert(v.begin(), v.end());
}
return m;
}
std::vector<std::shared_ptr<LoadStreamStub>> streams() { return _streams; }
private:
std::vector<std::shared_ptr<LoadStreamStub>> _streams;
std::atomic<bool> _open_success = false;
std::atomic<size_t> _select_index = 0;
const bool _is_incremental;
};
} // namespace doris
#include "common/compile_check_end.h"