blob: 31164222a9517d500746fd9c7d1cf2df40e43584 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <atomic>
#include <cstdint>
#include <memory>
#include <mutex>
#include <ostream>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include "common/status.h"
#include "runtime/thread_context.h"
#include "util/runtime_profile.h"
#include "util/spinlock.h"
#include "util/uid_util.h"
namespace doris {
class PTabletWriterOpenRequest;
class PTabletWriterAddBlockRequest;
class PTabletWriterAddBlockResult;
class OpenPartitionRequest;
class BaseTabletsChannel;
// A LoadChannel manages tablets channels for all indexes
// corresponding to a certain load job
class LoadChannel {
public:
LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_high_priority,
std::string sender_ip, int64_t backend_id, bool enable_profile, int64_t wg_id);
~LoadChannel();
// open a new load channel if not exist
Status open(const PTabletWriterOpenRequest& request);
// this batch must belong to a index in one transaction
Status add_batch(const PTabletWriterAddBlockRequest& request,
PTabletWriterAddBlockResult* response);
// return true if this load channel has been opened and all tablets channels are closed then.
bool is_finished();
Status cancel();
time_t last_updated_time() const { return _last_updated_time.load(); }
const UniqueId& load_id() const { return _load_id; }
int64_t timeout() const { return _timeout_s; }
bool is_high_priority() const { return _is_high_priority; }
WorkloadGroupPtr workload_group() const { return _resource_ctx->workload_group(); }
RuntimeProfile::Counter* get_mgr_add_batch_timer() { return _mgr_add_batch_timer; }
RuntimeProfile::Counter* get_handle_mem_limit_timer() { return _handle_mem_limit_timer; }
protected:
Status _get_tablets_channel(std::shared_ptr<BaseTabletsChannel>& channel, bool& is_finished,
int64_t index_id);
Status _handle_eos(BaseTabletsChannel* channel, const PTabletWriterAddBlockRequest& request,
PTabletWriterAddBlockResult* response);
void _init_profile();
// thread safety
void _report_profile(PTabletWriterAddBlockResult* response);
private:
UniqueId _load_id;
int64_t _txn_id = 0;
SpinLock _profile_serialize_lock;
std::unique_ptr<RuntimeProfile> _profile;
RuntimeProfile* _self_profile = nullptr;
RuntimeProfile::Counter* _add_batch_number_counter = nullptr;
RuntimeProfile::Counter* _add_batch_timer = nullptr;
RuntimeProfile::Counter* _add_batch_times = nullptr;
RuntimeProfile::Counter* _mgr_add_batch_timer = nullptr;
RuntimeProfile::Counter* _handle_mem_limit_timer = nullptr;
RuntimeProfile::Counter* _handle_eos_timer = nullptr;
// lock protect the tablets channel map
std::mutex _lock;
// index id -> tablets channel
std::unordered_map<int64_t, std::shared_ptr<BaseTabletsChannel>> _tablets_channels;
// index id -> (received rows, filtered rows)
std::unordered_map<int64_t, std::pair<size_t, size_t>> _tablets_channels_rows;
std::mutex _tablets_channels_lock;
// This is to save finished channels id, to handle the retry request.
std::unordered_set<int64_t> _finished_channel_ids;
// set to true if at least one tablets channel has been opened
bool _opened = false;
std::shared_ptr<ResourceContext> _resource_ctx;
std::atomic<time_t> _last_updated_time;
// the timeout of this load job.
// Timed out channels will be periodically deleted by LoadChannelMgr.
int64_t _timeout_s;
// true if this is a high priority load task
bool _is_high_priority = false;
// the ip where tablet sink locate
std::string _sender_ip;
int64_t _backend_id;
bool _enable_profile;
};
inline std::ostream& operator<<(std::ostream& os, LoadChannel& load_channel) {
os << "LoadChannel(id=" << load_channel.load_id()
<< ", last_update_time=" << static_cast<uint64_t>(load_channel.last_updated_time())
<< ", is high priority: " << load_channel.is_high_priority() << ")";
return os;
}
} // namespace doris