blob: bde0e8dd69d65fda45a36300d1ff3598bf8cccd9 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/wal_table.h"
#include <event2/bufferevent.h>
#include <event2/event.h>
#include <event2/event_struct.h>
#include <event2/http.h>
#include <thrift/protocol/TDebugProtocol.h>
#include "evhttp.h"
#include "http/action/stream_load.h"
#include "http/ev_http_server.h"
#include "http/http_common.h"
#include "http/http_headers.h"
#include "http/utils.h"
#include "io/fs/local_file_system.h"
#include "olap/wal_manager.h"
#include "runtime/client_cache.h"
#include "runtime/fragment_mgr.h"
#include "runtime/plan_fragment_executor.h"
#include "util/path_util.h"
#include "util/thrift_rpc_helper.h"
#include "vec/exec/format/wal/wal_reader.h"
namespace doris {
WalTable::WalTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id)
: _exec_env(exec_env), _db_id(db_id), _table_id(table_id), _stop(false) {}
WalTable::~WalTable() {}
#ifdef BE_TEST
std::string k_request_line;
#endif
bool retry = false;
void WalTable::add_wals(std::vector<std::string> wals) {
std::lock_guard<std::mutex> lock(_replay_wal_lock);
for (const auto& wal : wals) {
LOG(INFO) << "add replay wal " << wal;
_replay_wal_map.emplace(wal, replay_wal_info {0, UnixMillis(), false});
}
}
Status WalTable::replay_wals() {
std::vector<std::string> need_replay_wals;
std::vector<std::string> need_erase_wals;
{
std::lock_guard<std::mutex> lock(_replay_wal_lock);
if (_replay_wal_map.empty()) {
return Status::OK();
}
VLOG_DEBUG << "Start replay wals for db=" << _db_id << ", table=" << _table_id
<< ", wal size=" << _replay_wal_map.size();
for (auto& [wal, info] : _replay_wal_map) {
auto& [retry_num, start_ts, replaying] = info;
if (replaying) {
LOG(INFO) << wal << " is replaying, skip this round";
return Status::OK();
}
if (retry_num >= config::group_commit_replay_wal_retry_num) {
LOG(WARNING) << "All replay wal failed, db=" << _db_id << ", table=" << _table_id
<< ", wal=" << wal
<< ", retry_num=" << config::group_commit_replay_wal_retry_num;
std::string rename_path = _get_tmp_path(wal);
LOG(INFO) << "rename wal from " << wal << " to " << rename_path;
std::rename(wal.c_str(), rename_path.c_str());
need_erase_wals.push_back(wal);
continue;
}
if (_need_replay(info)) {
need_replay_wals.push_back(wal);
}
}
std::sort(need_replay_wals.begin(), need_replay_wals.end());
for (const auto& wal : need_erase_wals) {
if (_replay_wal_map.erase(wal)) {
LOG(INFO) << "erase wal " << wal << " from _replay_wal_map";
} else {
LOG(WARNING) << "fail to erase wal " << wal << " from _replay_wal_map";
}
}
}
for (const auto& wal : need_replay_wals) {
{
std::lock_guard<std::mutex> lock(_replay_wal_lock);
if (_stop.load()) {
break;
} else {
auto it = _replay_wal_map.find(wal);
if (it != _replay_wal_map.end()) {
auto& [retry_num, start_time, replaying] = it->second;
replaying = true;
}
}
}
auto st = _replay_wal_internal(wal);
if (!st.ok()) {
std::lock_guard<std::mutex> lock(_replay_wal_lock);
auto it = _replay_wal_map.find(wal);
if (it != _replay_wal_map.end()) {
auto& [retry_num, start_time, replaying] = it->second;
replaying = false;
}
LOG(WARNING) << "failed replay wal, drop this round, db=" << _db_id
<< ", table=" << _table_id << ", wal=" << wal << ", st=" << st.to_string();
break;
}
VLOG_NOTICE << "replay wal, db=" << _db_id << ", table=" << _table_id << ", label=" << wal
<< ", st=" << st.to_string();
}
return Status::OK();
}
std::string WalTable::_get_tmp_path(const std::string wal) {
std::vector<std::string> path_element;
doris::vectorized::WalReader::string_split(wal, "/", path_element);
std::stringstream ss;
int index = 0;
while (index < path_element.size() - 3) {
ss << path_element[index] << "/";
index++;
}
ss << "tmp/";
while (index < path_element.size()) {
if (index != path_element.size() - 1) {
ss << path_element[index] << "_";
} else {
ss << path_element[index];
}
index++;
}
return ss.str();
}
bool WalTable::_need_replay(const doris::WalTable::replay_wal_info& info) {
#ifndef BE_TEST
auto& [retry_num, start_ts, replaying] = info;
auto replay_interval =
pow(2, retry_num) * config::group_commit_replay_wal_retry_interval_seconds * 1000;
return UnixMillis() - start_ts >= replay_interval;
#else
return true;
#endif
}
Status WalTable::_abort_txn(int64_t db_id, int64_t wal_id) {
TLoadTxnRollbackRequest request;
request.__set_auth_code(0); // this is a fake, fe not check it now
request.__set_db_id(db_id);
request.__set_txnId(wal_id);
std::string reason = "relay wal " + std::to_string(wal_id);
request.__set_reason(reason);
TLoadTxnRollbackResult result;
TNetworkAddress master_addr = _exec_env->master_info()->network_address;
auto st = ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, &result](FrontendServiceConnection& client) {
client->loadTxnRollback(result, request);
},
10000L);
auto result_status = Status::create(result.status);
LOG(INFO) << "abort txn " << wal_id << ",st:" << st << ",result_status:" << result_status;
return result_status;
}
Status WalTable::_replay_wal_internal(const std::string& wal) {
LOG(INFO) << "Start replay wal for db=" << _db_id << ", table=" << _table_id << ", wal=" << wal;
// start a new stream load
{
std::lock_guard<std::mutex> lock(_replay_wal_lock);
auto it = _replay_wal_map.find(wal);
if (it != _replay_wal_map.end()) {
auto& [retry_num, start_time, replaying] = it->second;
++retry_num;
replaying = true;
} else {
LOG(WARNING) << "can not find wal in stream load replay map. db=" << _db_id
<< ", table=" << _table_id << ", wal=" << wal;
return Status::OK();
}
}
std::shared_ptr<std::pair<int64_t, std::string>> pair = nullptr;
RETURN_IF_ERROR(_get_wal_info(wal, pair));
auto wal_id = pair->first;
auto label = pair->second;
#ifndef BE_TEST
auto st = _abort_txn(_db_id, wal_id);
if (!st.ok()) {
LOG(WARNING) << "abort txn " << wal_id << " fail";
}
RETURN_IF_ERROR(_get_column_info(_db_id, _table_id));
#endif
RETURN_IF_ERROR(_send_request(wal_id, wal, label));
return Status::OK();
}
Status WalTable::_get_wal_info(const std::string& wal,
std::shared_ptr<std::pair<int64_t, std::string>>& pair) {
std::vector<std::string> path_element;
doris::vectorized::WalReader::string_split(wal, "/", path_element);
auto pos = path_element[path_element.size() - 1].find("_");
try {
int64_t wal_id = std::strtoll(path_element[path_element.size() - 1].substr(0, pos).c_str(),
NULL, 10);
auto label = path_element[path_element.size() - 1].substr(pos + 1);
pair = std::make_shared<std::pair<int64_t, std::string>>(std::make_pair(wal_id, label));
} catch (const std::invalid_argument& e) {
return Status::InvalidArgument("Invalid format, {}", e.what());
}
return Status::OK();
}
void http_request_done(struct evhttp_request* req, void* arg) {
std::stringstream out;
std::string status;
std::string msg;
std::string wal_id;
size_t len = 0;
if (req != nullptr) {
auto input = evhttp_request_get_input_buffer(req);
char* request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF);
while (request_line != nullptr) {
std::string s(request_line);
out << request_line;
request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF);
}
auto out_str = out.str();
LOG(INFO) << "replay wal out_str:" << out_str;
rapidjson::Document doc;
if (!out_str.empty()) {
doc.Parse(out.str().c_str());
status = std::string(doc["Status"].GetString());
msg = std::string(doc["Message"].GetString());
LOG(INFO) << "replay wal status:" << status << ",msg:" << msg;
if (status.find("Fail") != status.npos) {
if (msg.find("Label") != msg.npos &&
msg.find("has already been used") != msg.npos) {
retry = false;
} else {
retry = true;
}
} else {
retry = false;
}
} else {
retry = true;
}
} else {
LOG(WARNING) << "req is null";
}
if (arg != nullptr) {
event_base_loopbreak((struct event_base*)arg);
} else {
LOG(WARNING) << "arg is null";
}
}
Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const std::string& label) {
#ifndef BE_TEST
struct event_base* base = nullptr;
struct evhttp_connection* conn = nullptr;
struct evhttp_request* req = nullptr;
retry = false;
event_init();
base = event_base_new();
conn = evhttp_connection_new("127.0.0.1", doris::config::webserver_port);
evhttp_connection_set_base(conn, base);
req = evhttp_request_new(http_request_done, base);
evhttp_add_header(req->output_headers, HTTP_LABEL_KEY.c_str(), label.c_str());
evhttp_add_header(req->output_headers, HTTP_AUTH_CODE.c_str(), std::to_string(wal_id).c_str());
evhttp_add_header(req->output_headers, HTTP_WAL_ID_KY.c_str(), std::to_string(wal_id).c_str());
std::string columns;
RETURN_IF_ERROR(_read_wal_header(wal, columns));
std::vector<std::string> column_id_element;
doris::vectorized::WalReader::string_split(columns, ",", column_id_element);
std::vector<size_t> index_vector;
std::stringstream ss_name;
std::stringstream ss_id;
int index_raw = 0;
for (auto column_id_str : column_id_element) {
try {
int64_t column_id = std::strtoll(column_id_str.c_str(), NULL, 10);
auto it = _column_id_name_map.find(column_id);
auto it2 = _column_id_index_map.find(column_id);
if (it != _column_id_name_map.end() && it2 != _column_id_index_map.end()) {
ss_name << "`" << it->second << "`,";
ss_id << "c" << std::to_string(_column_id_index_map[column_id]) << ",";
index_vector.emplace_back(index_raw);
_column_id_name_map.erase(column_id);
_column_id_index_map.erase(column_id);
}
index_raw++;
} catch (const std::invalid_argument& e) {
return Status::InvalidArgument("Invalid format, {}", e.what());
}
}
_exec_env->wal_mgr()->add_wal_column_index(wal_id, index_vector);
auto name = ss_name.str().substr(0, ss_name.str().size() - 1);
auto id = ss_id.str().substr(0, ss_id.str().size() - 1);
std::stringstream ss;
ss << "insert into doris_internal_table_id(" << _table_id << ") WITH LABEL " << label << " ("
<< name << ") select " << id << " from http_stream(\"format\" = \"wal\", \"table_id\" = \""
<< std::to_string(_table_id) << "\")";
evhttp_add_header(req->output_headers, HTTP_SQL.c_str(), ss.str().c_str());
evbuffer* output = evhttp_request_get_output_buffer(req);
evbuffer_add_printf(output, "replay wal %s", std::to_string(wal_id).c_str());
evhttp_make_request(conn, req, EVHTTP_REQ_PUT, "/api/_http_stream");
evhttp_connection_set_timeout(req->evcon, 300);
event_base_dispatch(base);
evhttp_connection_free(conn);
event_base_free(base);
#else
std::stringstream out;
out << k_request_line;
auto out_str = out.str();
rapidjson::Document doc;
doc.Parse(out_str.c_str());
auto status = std::string(doc["Status"].GetString());
if (status.find("Fail") != status.npos) {
retry = true;
} else {
retry = false;
}
#endif
if (retry) {
LOG(INFO) << "fail to replay wal =" << wal;
std::lock_guard<std::mutex> lock(_replay_wal_lock);
auto it = _replay_wal_map.find(wal);
if (it != _replay_wal_map.end()) {
auto& [retry_num, start_time, replaying] = it->second;
replaying = false;
} else {
_replay_wal_map.emplace(wal, replay_wal_info {0, UnixMillis(), false});
}
} else {
LOG(INFO) << "success to replay wal =" << wal;
RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(wal_id));
RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(_table_id, wal_id));
std::lock_guard<std::mutex> lock(_replay_wal_lock);
if (_replay_wal_map.erase(wal)) {
LOG(INFO) << "erase " << wal << " from _replay_wal_map";
} else {
LOG(WARNING) << "fail to erase " << wal << " from _replay_wal_map";
}
}
_exec_env->wal_mgr()->erase_wal_column_index(wal_id);
return Status::OK();
}
void WalTable::stop() {
bool done = true;
do {
{
std::lock_guard<std::mutex> lock(_replay_wal_lock);
if (!this->_stop.load()) {
this->_stop.store(true);
}
auto it = _replay_wal_map.begin();
for (; it != _replay_wal_map.end(); it++) {
auto& [retry_num, start_time, replaying] = it->second;
if (replaying) {
break;
}
}
if (it != _replay_wal_map.end()) {
done = false;
} else {
done = true;
}
}
if (!done) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
} while (!done);
}
size_t WalTable::size() {
std::lock_guard<std::mutex> lock(_replay_wal_lock);
return _replay_wal_map.size();
}
Status WalTable::_get_column_info(int64_t db_id, int64_t tb_id) {
TGetColumnInfoRequest request;
request.__set_db_id(db_id);
request.__set_table_id(tb_id);
TGetColumnInfoResult result;
Status status;
TNetworkAddress master_addr = _exec_env->master_info()->network_address;
if (master_addr.hostname.empty() || master_addr.port == 0) {
status = Status::InternalError("Have not get FE Master heartbeat yet");
} else {
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, &result](FrontendServiceConnection& client) {
client->getColumnInfo(result, request);
}));
std::string columns_str = result.column_info;
std::vector<std::string> column_element;
doris::vectorized::WalReader::string_split(columns_str, ",", column_element);
int64_t column_index = 1;
_column_id_name_map.clear();
_column_id_index_map.clear();
for (auto column : column_element) {
auto pos = column.find(":");
try {
auto column_name = column.substr(0, pos);
int64_t column_id = std::strtoll(column.substr(pos + 1).c_str(), NULL, 10);
_column_id_name_map.emplace(column_id, column_name);
_column_id_index_map.emplace(column_id, column_index);
column_index++;
} catch (const std::invalid_argument& e) {
return Status::InvalidArgument("Invalid format, {}", e.what());
}
}
status = Status::create(result.status);
}
return status;
}
Status WalTable::_read_wal_header(const std::string& wal_path, std::string& columns) {
std::shared_ptr<doris::WalReader> wal_reader;
RETURN_IF_ERROR(_exec_env->wal_mgr()->create_wal_reader(wal_path, wal_reader));
uint32_t version = 0;
RETURN_IF_ERROR(wal_reader->read_header(version, columns));
RETURN_IF_ERROR(wal_reader->finalize());
return Status::OK();
}
} // namespace doris