blob: 2b6f4c86cc2453e338aa63a9da7ec5e95b35a9eb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#pragma once
#include <event2/bufferevent.h>
#include <atomic>
#include <deque>
#include <memory>
#include <string>
#include <thread>
#include <tuple>
#include <utility>
#include <vector>
#include "event_util.h"
#include "server/redis_connection.h"
#include "status.h"
#include "storage/storage.h"
class Server;
enum ReplState {
kReplConnecting = 1,
kReplSendAuth,
kReplCheckDBName,
kReplReplConf,
kReplSendPSync,
kReplFetchMeta,
kReplFetchSST,
kReplConnected,
kReplError,
};
enum WriteBatchType {
kBatchTypeNone = 0,
kBatchTypePublish,
kBatchTypePropagate,
kBatchTypeStream,
};
using FetchFileCallback = std::function<void(const std::string, const uint32_t)>;
class FeedSlaveThread {
public:
explicit FeedSlaveThread(Server *srv, redis::Connection *conn, rocksdb::SequenceNumber next_repl_seq)
: srv_(srv), conn_(conn), next_repl_seq_(next_repl_seq) {}
~FeedSlaveThread() = default;
Status Start();
void Stop();
void Join();
bool IsStopped() { return stop_; }
redis::Connection *GetConn() { return conn_.get(); }
rocksdb::SequenceNumber GetCurrentReplSeq() {
auto seq = next_repl_seq_.load();
return seq == 0 ? 0 : seq - 1;
}
private:
uint64_t interval_ = 0;
std::atomic<bool> stop_ = false;
Server *srv_ = nullptr;
std::unique_ptr<redis::Connection> conn_ = nullptr;
std::atomic<rocksdb::SequenceNumber> next_repl_seq_ = 0;
std::thread t_;
std::unique_ptr<rocksdb::TransactionLogIterator> iter_ = nullptr;
static const size_t kMaxDelayUpdates = 16;
static const size_t kMaxDelayBytes = 16 * 1024;
void loop();
void checkLivenessIfNeed();
};
class ReplicationThread : private EventCallbackBase<ReplicationThread> {
public:
explicit ReplicationThread(std::string host, uint32_t port, Server *srv);
Status Start(std::function<void()> &&pre_fullsync_cb, std::function<void()> &&post_fullsync_cb);
void Stop();
ReplState State() { return repl_state_.load(std::memory_order_relaxed); }
time_t LastIOTime() { return last_io_time_.load(std::memory_order_relaxed); }
void TimerCB(int, int16_t);
protected:
event_base *base_ = nullptr;
// The state machine to manage the asynchronous steps used in replication
class CallbacksStateMachine {
public:
enum class State {
NEXT,
PREV,
AGAIN,
QUIT,
RESTART,
};
enum EventType {
READ,
WRITE,
};
using CallbackType = std::tuple<EventType, std::string, std::function<State(bufferevent *, void *)>>;
using CallbackList = std::deque<CallbackType>;
CallbacksStateMachine(ReplicationThread *repl, CallbackList &&handlers)
: repl_(repl), handlers_(std::move(handlers)) {}
void Start();
void Stop();
static void EvCallback(bufferevent *bev, void *ctx);
static void ConnEventCB(bufferevent *bev, int16_t events, void *state_machine_ptr);
static void SetReadCB(bufferevent *bev, bufferevent_data_cb cb, void *state_machine_ptr);
static void SetWriteCB(bufferevent *bev, bufferevent_data_cb cb, void *state_machine_ptr);
private:
bufferevent *bev_ = nullptr;
ReplicationThread *repl_;
CallbackList handlers_;
CallbackList::size_type handler_idx_ = 0;
EventType getHandlerEventType(CallbackList::size_type idx) { return std::get<0>(handlers_[idx]); }
std::string getHandlerName(CallbackList::size_type idx) { return std::get<1>(handlers_[idx]); }
std::function<State(bufferevent *, void *)> getHandlerFunc(CallbackList::size_type idx) {
return std::get<2>(handlers_[idx]);
}
};
private:
std::thread t_;
std::atomic<bool> stop_flag_ = false;
std::string host_;
uint32_t port_;
Server *srv_ = nullptr;
engine::Storage *storage_ = nullptr;
std::atomic<ReplState> repl_state_;
std::atomic<time_t> last_io_time_ = 0;
bool next_try_old_psync_ = false;
bool next_try_without_announce_ip_address_ = false;
std::function<void()> pre_fullsync_cb_;
std::function<void()> post_fullsync_cb_;
// Internal states managed by FullSync procedure
enum FullSyncState {
kFetchMetaID,
kFetchMetaSize,
kFetchMetaContent,
} fullsync_state_ = kFetchMetaID;
rocksdb::BackupID fullsync_meta_id_ = 0;
size_t fullsync_filesize_ = 0;
// Internal states managed by IncrementBatchLoop procedure
enum IncrementBatchLoopState {
Incr_batch_size,
Incr_batch_data,
} incr_state_ = Incr_batch_size;
size_t incr_bulk_len_ = 0;
using CBState = CallbacksStateMachine::State;
CallbacksStateMachine psync_steps_;
CallbacksStateMachine fullsync_steps_;
void run();
static CBState authWriteCB(bufferevent *bev, void *ctx);
static CBState authReadCB(bufferevent *bev, void *ctx);
static CBState checkDBNameWriteCB(bufferevent *bev, void *ctx);
static CBState checkDBNameReadCB(bufferevent *bev, void *ctx);
static CBState replConfWriteCB(bufferevent *bev, void *ctx);
static CBState replConfReadCB(bufferevent *bev, void *ctx);
static CBState tryPSyncWriteCB(bufferevent *bev, void *ctx);
static CBState tryPSyncReadCB(bufferevent *bev, void *ctx);
static CBState incrementBatchLoopCB(bufferevent *bev, void *ctx);
static CBState fullSyncWriteCB(bufferevent *bev, void *ctx);
static CBState fullSyncReadCB(bufferevent *bev, void *ctx);
// Synchronized-Blocking ops
Status sendAuth(int sock_fd);
Status fetchFile(int sock_fd, evbuffer *evbuf, const std::string &dir, const std::string &file, uint32_t crc,
const FetchFileCallback &fn);
Status fetchFiles(int sock_fd, const std::string &dir, const std::vector<std::string> &files,
const std::vector<uint32_t> &crcs, const FetchFileCallback &fn);
Status parallelFetchFile(const std::string &dir, const std::vector<std::pair<std::string, uint32_t>> &files);
static bool isRestoringError(const char *err);
static bool isWrongPsyncNum(const char *err);
static bool isUnknownOption(const char *err);
Status parseWriteBatch(const std::string &batch_string);
};
/*
* An extractor to extract update from raw writebatch
*/
class WriteBatchHandler : public rocksdb::WriteBatch::Handler {
public:
rocksdb::Status PutCF(uint32_t column_family_id, const rocksdb::Slice &key, const rocksdb::Slice &value) override;
rocksdb::Status DeleteCF(uint32_t column_family_id, const rocksdb::Slice &key) override {
return rocksdb::Status::OK();
}
rocksdb::Status DeleteRangeCF(uint32_t column_family_id, const rocksdb::Slice &begin_key,
const rocksdb::Slice &end_key) override {
return rocksdb::Status::OK();
}
WriteBatchType Type() { return type_; }
std::string Key() const { return kv_.first; }
std::string Value() const { return kv_.second; }
private:
std::pair<std::string, std::string> kv_;
WriteBatchType type_ = kBatchTypeNone;
};