blob: 09d2b90d99e61e072495d902301c8fc454d60e16 [file] [log] [blame]
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
#pragma once
#include <dsn/dist/replication/replica_base.h>
#include "base/pegasus_rpc_types.h"
#include "pegasus_write_service.h"
namespace pegasus {
namespace server {
/// This class implements the interface of `pegasus_sever_impl::on_batched_write_requests`.
class pegasus_server_write : public dsn::replication::replica_base
{
public:
pegasus_server_write(pegasus_server_impl *server, bool verbose_log);
/// \return error code returned by rocksdb, i.e rocksdb::Status::code.
/// **NOTE**
/// Error returned is regarded as the failure of replica, thus will trigger
/// cluster membership changes. Make sure no error is returned because of
/// invalid user argument.
/// As long as the returned error is 0, the operation is guaranteed to be
/// successfully applied into rocksdb, which means an empty_put will be called
/// even if there's no write.
int on_batched_write_requests(dsn::message_ex **requests,
int count,
int64_t decree,
uint64_t timestamp);
void set_default_ttl(uint32_t ttl);
private:
/// Delay replying for the batched requests until all of them complete.
int on_batched_writes(dsn::message_ex **requests, int count);
int on_single_put_in_batch(put_rpc &rpc)
{
int err = _write_svc->batch_put(_write_ctx, rpc.request(), rpc.response());
request_key_check(_decree, rpc.dsn_request(), rpc.request().key);
return err;
}
int on_single_remove_in_batch(remove_rpc &rpc)
{
int err = _write_svc->batch_remove(_decree, rpc.request(), rpc.response());
request_key_check(_decree, rpc.dsn_request(), rpc.request());
return err;
}
// Ensure that the write request is directed to the right partition.
// In verbose mode it will log for every request.
void request_key_check(int64_t decree, dsn::message_ex *m, const dsn::blob &key);
private:
friend class pegasus_server_write_test;
friend class pegasus_write_service_test;
friend class pegasus_write_service_impl_test;
std::unique_ptr<pegasus_write_service> _write_svc;
std::vector<put_rpc> _put_rpc_batch;
std::vector<remove_rpc> _remove_rpc_batch;
db_write_context _write_ctx;
int64_t _decree;
const bool _verbose_log;
};
} // namespace server
} // namespace pegasus