blob: c3719111b89ef4b79f038dc73a8a332946de1228 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <bthread/mutex.h>
#include <gen_cpp/olap_common.pb.h>
#include <memory>
#include <mutex>
#include <unordered_map>
#include <utility>
#include "brpc/stream.h"
#include "butil/iobuf.h"
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/status.h"
#include "runtime/load_stream_writer.h"
#include "runtime/workload_management/resource_context.h"
#include "util/runtime_profile.h"
namespace doris {
class LoadStreamMgr;
class ThreadPoolToken;
class OlapTableSchemaParam;
// origin_segid(index) -> new_segid(value in vector)
using SegIdMapping = std::vector<uint32_t>;
using FailedTablets = std::vector<std::pair<int64_t, Status>>;
class TabletStream {
public:
TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id, LoadStreamMgr* load_stream_mgr,
RuntimeProfile* profile);
Status init(std::shared_ptr<OlapTableSchemaParam> schema, int64_t index_id,
int64_t partition_id);
Status append_data(const PStreamHeader& header, butil::IOBuf* data);
Status add_segment(const PStreamHeader& header, butil::IOBuf* data);
void add_num_segments(int64_t num_segments) { _num_segments += num_segments; }
void disable_num_segments_check() { _check_num_segments = false; }
void pre_close();
Status close();
int64_t id() const { return _id; }
friend std::ostream& operator<<(std::ostream& ostr, const TabletStream& tablet_stream);
private:
Status _run_in_heavy_work_pool(std::function<Status()> fn);
int64_t _id;
LoadStreamWriterSharedPtr _load_stream_writer;
std::unique_ptr<ThreadPoolToken> _flush_token;
std::unordered_map<int64_t, std::unique_ptr<SegIdMapping>> _segids_mapping;
std::atomic<uint32_t> _next_segid;
int64_t _num_segments = 0;
bool _check_num_segments = true;
bthread::Mutex _lock;
AtomicStatus _status;
PUniqueId _load_id;
int64_t _txn_id;
RuntimeProfile* _profile = nullptr;
RuntimeProfile::Counter* _append_data_timer = nullptr;
RuntimeProfile::Counter* _add_segment_timer = nullptr;
RuntimeProfile::Counter* _close_wait_timer = nullptr;
LoadStreamMgr* _load_stream_mgr = nullptr;
};
using TabletStreamSharedPtr = std::shared_ptr<TabletStream>;
class IndexStream {
public:
IndexStream(PUniqueId load_id, int64_t id, int64_t txn_id,
std::shared_ptr<OlapTableSchemaParam> schema, LoadStreamMgr* load_stream_mgr,
RuntimeProfile* profile);
Status append_data(const PStreamHeader& header, butil::IOBuf* data);
void close(const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablet_ids);
private:
void _init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id,
int64_t partition_id);
private:
int64_t _id;
std::unordered_map<int64_t /*tabletid*/, TabletStreamSharedPtr> _tablet_streams_map;
bthread::Mutex _lock;
PUniqueId _load_id;
int64_t _txn_id;
std::shared_ptr<OlapTableSchemaParam> _schema;
std::unordered_map<int64_t, int64_t> _tablet_partitions;
RuntimeProfile* _profile = nullptr;
RuntimeProfile::Counter* _append_data_timer = nullptr;
RuntimeProfile::Counter* _close_wait_timer = nullptr;
LoadStreamMgr* _load_stream_mgr = nullptr;
};
using IndexStreamSharedPtr = std::shared_ptr<IndexStream>;
using StreamId = brpc::StreamId;
class LoadStream : public brpc::StreamInputHandler {
public:
LoadStream(PUniqueId load_id, LoadStreamMgr* load_stream_mgr, bool enable_profile);
~LoadStream() override;
Status init(const POpenLoadStreamRequest* request);
void add_source(int64_t src_id) {
std::lock_guard lock_guard(_lock);
_open_streams[src_id]++;
if (_is_incremental) {
_total_streams++;
}
}
void close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablet_ids);
// callbacks called by brpc
int on_received_messages(StreamId id, butil::IOBuf* const messages[], size_t size) override;
void on_idle_timeout(StreamId id) override;
void on_closed(StreamId id) override;
friend std::ostream& operator<<(std::ostream& ostr, const LoadStream& load_stream);
private:
void _parse_header(butil::IOBuf* const message, PStreamHeader& hdr);
void _dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* data);
Status _append_data(const PStreamHeader& header, butil::IOBuf* data);
void _report_result(StreamId stream, const Status& status,
const std::vector<int64_t>& success_tablet_ids,
const FailedTablets& failed_tablets, bool eos);
void _report_schema(StreamId stream, const PStreamHeader& hdr);
// report failure for one message
void _report_failure(StreamId stream, const Status& status, const PStreamHeader& header) {
FailedTablets failed_tablets;
if (header.has_tablet_id()) {
failed_tablets.emplace_back(header.tablet_id(), status);
}
_report_result(stream, status, {}, failed_tablets, false);
}
Status _write_stream(StreamId stream, butil::IOBuf& buf);
private:
PUniqueId _load_id;
std::unordered_map<int64_t, IndexStreamSharedPtr> _index_streams_map;
int32_t _total_streams = 0;
int32_t _close_load_cnt = 0;
std::atomic<int32_t> _close_rpc_cnt = 0;
std::vector<PTabletID> _tablets_to_commit;
bthread::Mutex _lock;
std::unordered_map<int64_t, int32_t> _open_streams;
int64_t _txn_id = 0;
std::shared_ptr<OlapTableSchemaParam> _schema;
bool _enable_profile = false;
std::unique_ptr<RuntimeProfile> _profile;
RuntimeProfile::Counter* _append_data_timer = nullptr;
RuntimeProfile::Counter* _close_wait_timer = nullptr;
LoadStreamMgr* _load_stream_mgr = nullptr;
std::shared_ptr<ResourceContext> _resource_ctx;
bool _is_incremental = false;
};
using LoadStreamPtr = std::unique_ptr<LoadStream>;
} // namespace doris