blob: cef91983ecc60154a9aa06112503135bd4168a4e [file] [log] [blame]
#include "sync.h"
#include <event2/buffer.h>
#include <event2/bufferevent.h>
#include <event2/event.h>
#include <glog/logging.h>
#include <rocksdb/write_batch.h>
#include <fcntl.h>
#include <unistd.h>
#include <string>
#include <fstream>
#include "../../src/redis_reply.h"
#include "util.h"
void send_string_to_event(bufferevent *bev, const std::string &data) {
auto output = bufferevent_get_output(bev);
evbuffer_add(output, data.c_str(), data.length());
}
Sync::Sync(Server *srv, Writer *writer, Parser *parser, Kvrocks2redis::Config *config)
: ReplicationThread(config->kvrocks_host, config->kvrocks_port, srv, config->kvrocks_auth),
storage_(srv->storage_),
writer_(writer),
parser_(parser),
config_(config),
sync_state_(kReplConnecting),
psync_steps_(this,
CallbacksStateMachine::CallbackList{
CallbacksStateMachine::CallbackType{
CallbacksStateMachine::WRITE, "psync write", tryPSyncWriteCB
},
CallbacksStateMachine::CallbackType{
CallbacksStateMachine::READ, "psync read", tryPSyncReadCB
},
CallbacksStateMachine::CallbackType{
CallbacksStateMachine::READ, "batch loop", incrementBatchLoopCB
}
}) {
}
/*
* Run connect to kvrocks, and start the following steps
* asynchronously
* - TryPsync
* - - if ok, IncrementBatchLoop
* - - not, parseAllLocalStorage and restart TryPsync when done
*/
void Sync::Start() {
auto s = readNextSeqFromFile(&next_seq_);
if (!s.IsOK()) {
LOG(ERROR) << s.Msg();
return;
}
base_ = event_base_new();
if (base_ == nullptr) {
LOG(ERROR) << "[kvrocks2redis] Failed to create new ev base";
return;
}
LOG(INFO) << "[kvrocks2redis] Start sync the data from kvrocks to redis";
psync_steps_.Start();
auto timer = event_new(base_, -1, EV_PERSIST, EventTimerCB, this);
timeval tmo{1, 0}; // 1 sec
evtimer_add(timer, &tmo);
event_base_dispatch(base_);
event_free(timer);
event_base_free(base_);
}
void Sync::Stop() {
if (stop_flag_) return;
stop_flag_ = true; // Stopping procedure is asynchronous,
// handled by timer
LOG(INFO) << "[kvrocks2redis] Stopped";
}
Sync::CBState Sync::tryPSyncWriteCB(
bufferevent *bev, void *ctx) {
auto self = static_cast<Sync *>(ctx);
const auto seq_str = std::to_string(self->next_seq_);
const auto seq_len_str = std::to_string(seq_str.length());
const auto cmd_str = "*2" CRLF "$5" CRLF "PSYNC" CRLF "$" + seq_len_str +
CRLF + seq_str + CRLF;
send_string_to_event(bev, cmd_str);
self->sync_state_ = kReplSendPSync;
LOG(INFO) << "[kvrocks2redis] Try to use psync, next seq: " << self->next_seq_;
return CBState::NEXT;
}
Sync::CBState Sync::tryPSyncReadCB(bufferevent *bev,
void *ctx) {
char *line;
size_t line_len;
auto self = static_cast<Sync *>(ctx);
auto input = bufferevent_get_input(bev);
line = evbuffer_readln(input, &line_len, EVBUFFER_EOL_CRLF_STRICT);
if (!line) return CBState::AGAIN;
if (strncmp(line, "+OK", 3) != 0) {
if (self->next_seq_ > 0) {
// Ooops, Failed to psync , sync process has been terminated, administrator should be notified
// when full sync is needed, please remove last_next_seq config file, and restart kvrocks2redis
LOG(ERROR) << "[kvrocks2redis] CRITICAL - Failed to psync , administrator confirm needed : ";
self->stop_flag_ = true;
return CBState::QUIT;
}
// PSYNC isn't OK, we should use parseAllLocalStorage
// Switch to parseAllLocalStorage
self->parseKVFromLocalStorage();
LOG(INFO) << "[kvrocks2redis] Failed to psync, switch to parseAllLocalStorage";
LOG(INFO) << line;
free(line);
// Restart psync state machine
return CBState::RESTART;
} else {
// PSYNC is OK, use IncrementBatchLoop
free(line);
LOG(INFO) << "[kvrocks2redis] PSync is ok, start increment batch loop";
return CBState::NEXT;
}
}
Sync::CBState Sync::incrementBatchLoopCB(
bufferevent *bev, void *ctx) {
char *line = nullptr;
size_t line_len = 0;
char *bulk_data = nullptr;
auto self = static_cast<Sync *>(ctx);
self->sync_state_ = kReplConnected;
auto input = bufferevent_get_input(bev);
while (true) {
switch (self->incr_state_) {
case Incr_batch_size:
// Read bulk length
line = evbuffer_readln(input, &line_len, EVBUFFER_EOL_CRLF_STRICT);
if (!line) return CBState::AGAIN;
self->incr_bulk_len_ = line_len > 0 ? std::strtoull(line + 1, nullptr, 10) : 0;
free(line);
if (self->incr_bulk_len_ == 0) {
LOG(ERROR) << "[kvrocks2redis] Invalid increment data size";
return CBState::RESTART;
}
self->incr_state_ = Incr_batch_data;
case Incr_batch_data:
// Read bulk data (batch data)
if (self->incr_bulk_len_ + 2 <= evbuffer_get_length(input)) { // We got enough data
bulk_data = reinterpret_cast<char *>(evbuffer_pullup(input, self->incr_bulk_len_ + 2));
auto bat = rocksdb::WriteBatch(std::string(bulk_data, self->incr_bulk_len_));
int count = bat.Count();
self->parser_->ParseWriteBatch(std::string(bulk_data, self->incr_bulk_len_));
self->updateNextSeq(self->next_seq_ + count);
evbuffer_drain(input, self->incr_bulk_len_ + 2);
self->incr_state_ = Incr_batch_size;
} else {
return CBState::AGAIN;
}
}
}
}
// Check if stop_flag_ is set, when do, tear down kvrocks2redis
void Sync::EventTimerCB(int, int16_t, void *ctx) {
// DLOG(INFO) << "[kvrocks2redis] timer";
auto self = static_cast<Sync *>(ctx);
if (self->stop_flag_) {
LOG(INFO) << "[kvrocks2redis] Stop ev loop";
event_base_loopbreak(self->base_);
self->psync_steps_.Stop();
self->writer_->Stop();
// stop parseAllLocalStorage ?
}
}
void Sync::parseKVFromLocalStorage() {
LOG(INFO) << "[kvrocks2redis] Start parsing kv from the local storage";
for (const auto &iter : config_->tokens) {
auto s = writer_->FlushAll(iter.first);
if (!s.IsOK()) {
LOG(ERROR) << "[kvrocks2redis] Failed to flush all in namespace: " << iter.first
<< ", encounter error: " << s.Msg();
return;
}
}
Status s = parser_->ParseFullDB();
if (!s.IsOK()) {
LOG(ERROR) << "[kvrocks2redis] Failed to parse full db, encounter error: " << s.Msg();
return;
}
updateNextSeq(storage_->LatestSeq() + 1);
}
Status Sync::updateNextSeq(rocksdb::SequenceNumber seq) {
next_seq_ = seq;
return writeNextSeqToFile(seq);
}
Status Sync::readNextSeqFromFile(rocksdb::SequenceNumber *seq) {
next_seq_fd_ = open(config_->next_seq_file_path.data(), O_RDWR | O_CREAT, 0666);
if (next_seq_fd_ < 0) {
return Status(Status::NotOK, std::string("Failed to open next seq file :") + strerror(errno));
}
*seq = 0;
// 21 + 1 byte, extra one byte for the ending \0
char buf[22];
memset(buf, '\0', sizeof(buf));
if (read(next_seq_fd_, buf, sizeof(buf)) > 0) {
*seq = static_cast<rocksdb::SequenceNumber>(std::stoi(buf));
}
return Status::OK();
}
Status Sync::writeNextSeqToFile(rocksdb::SequenceNumber seq) {
std::string seq_string = std::to_string(seq);
// append to 21 byte (overwrite entire first 21 byte, aka the largest SequenceNumber size )
int append_byte = 21 - seq_string.size();
while (append_byte-- > 0) {
seq_string += " ";
}
seq_string += '\0';
pwrite(next_seq_fd_, seq_string.data(), seq_string.size(), 0);
return Status::OK();
}