blob: f6a1b75918d2c5f0127890b8904c161da04886f3 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "common/replication.codes.h"
#include "meta/meta_state_service.h"
#include "meta_state_service_utils.h"
#include "runtime/pipeline.h"
#include "utils/fmt_logging.h"
#include "utils/fmt_utils.h"
namespace dsn {
namespace replication {
namespace mss {
struct op_type
{
enum type
{
OP_NONE,
OP_CREATE_RECURSIVELY,
OP_CREATE,
OP_DELETE_RECURSIVELY,
OP_DELETE,
OP_SET_DATA,
OP_GET_DATA,
OP_GET_CHILDREN,
};
static const char *to_string(type v)
{
static const char *op_type_to_string_map[] = {
"OP_CREATE_RECURSIVELY",
"OP_CREATE",
"OP_DELETE_RECURSIVELY",
"OP_DELETE",
"OP_SET_DATA",
"OP_GET_DATA",
"OP_GET_CHILDREN",
};
CHECK(v != OP_NONE && v <= (sizeof(op_type_to_string_map) / sizeof(char *)),
"invalid type: {}",
v);
return op_type_to_string_map[v - 1];
}
};
USER_DEFINED_ENUM_FORMATTER(op_type::type)
/// Base class for all operations.
struct operation : pipeline::environment
{
void initialize(meta_storage *storage)
{
_ms = storage;
task_tracker(storage->_tracker).thread_pool(LPC_META_STATE_HIGH);
}
// The common strategy for error handling:
// retry after 1 sec if timeout, or terminate.
template <typename T>
void on_error(T *this_instance, op_type::type type, error_code ec, const std::string &path)
{
if (ec == ERR_TIMEOUT) {
LOG_WARNING("request({}) on path({}) was timeout, retry after 1 second",
op_type::to_string(type),
path);
pipeline::repeat(std::move(*this_instance), 1_s);
return;
}
CHECK(false,
"request({}) on path({}) encountered an unexpected error({})",
op_type::to_string(type),
path,
ec.to_string());
}
dist::meta_state_service *remote_storage() const { return _ms->_remote; }
dsn::task_tracker *tracker() const { return _ms->_tracker; }
private:
meta_storage *_ms{nullptr};
};
// Developer Notes:
//
// As a concern of performance, arguments are wrapped into a shared_ptr to be used
// in callback of meta_state_service without copying.
//
// To be able to repeat the internal task using pipeline::repeat, the operations must
// implement `void run()` method.
//
struct on_create_recursively : operation
{
struct arguments
{
std::function<void()> cb;
dsn::blob val;
std::queue<std::string> nodes;
};
std::shared_ptr<arguments> args;
// ASSERTED: !args->nodes.empty
void run()
{
// first node
if (_cur_path.empty()) { // first node requires leading '/'
_cur_path += args->nodes.front();
args->nodes.pop();
}
remote_storage()->create_node(_cur_path,
LPC_META_STATE_HIGH,
[op = *this](error_code ec) mutable { op.on_error(ec); },
args->nodes.empty() ? args->val : blob(),
tracker());
}
void on_error(error_code ec)
{
if (ec == ERR_OK || ec == ERR_NODE_ALREADY_EXIST) {
// create next node
if (!args->nodes.empty()) {
_cur_path += "/" + args->nodes.front();
args->nodes.pop();
pipeline::repeat(std::move(*this));
} else {
args->cb();
_cur_path.clear();
}
return;
}
operation::on_error(this, op_type::OP_CREATE_RECURSIVELY, ec, _cur_path);
}
private:
std::string _cur_path;
};
struct on_create : operation
{
struct arguments
{
std::function<void()> cb;
dsn::blob val;
std::string node;
};
std::shared_ptr<arguments> args;
void run()
{
remote_storage()->create_node(args->node,
LPC_META_STATE_HIGH,
[op = *this](error_code ec) mutable { op.on_error(ec); },
args->val,
tracker());
}
void on_error(error_code ec)
{
if (ec == ERR_OK || ec == ERR_NODE_ALREADY_EXIST) {
args->cb();
return;
}
operation::on_error(this, op_type::OP_CREATE, ec, args->node);
}
};
struct on_delete : operation
{
struct arguments
{
std::function<void()> cb;
std::string node;
bool is_recursively_delete{false};
};
std::shared_ptr<arguments> args;
void run()
{
remote_storage()->delete_node(args->node,
args->is_recursively_delete,
LPC_META_STATE_HIGH,
[op = *this](error_code ec) mutable { op.on_error(ec); },
tracker());
}
void on_error(error_code ec)
{
if (ec == ERR_OK || ec == ERR_OBJECT_NOT_FOUND) {
args->cb();
return;
}
auto type =
args->is_recursively_delete ? op_type::OP_DELETE_RECURSIVELY : op_type::OP_DELETE;
operation::on_error(this, type, ec, args->node);
}
};
struct on_get_data : operation
{
struct arguments
{
std::function<void(const blob &)> cb;
std::string node;
};
std::shared_ptr<arguments> args;
void run()
{
remote_storage()->get_data(
args->node,
LPC_META_STATE_HIGH,
[op = *this](error_code ec, const blob &val) mutable { op.on_error(ec, val); },
tracker());
}
void on_error(error_code ec, const blob &val)
{
if (ec == ERR_OK || ec == ERR_OBJECT_NOT_FOUND) {
args->cb(val);
return;
}
operation::on_error(this, op_type::OP_GET_DATA, ec, args->node);
}
};
struct on_set_data : operation
{
struct arguments
{
std::function<void()> cb;
std::string node;
dsn::blob val;
};
std::shared_ptr<arguments> args;
void run()
{
remote_storage()->set_data(args->node,
args->val,
LPC_META_STATE_HIGH,
[op = *this](error_code ec) mutable { op.on_error(ec); },
tracker());
}
void on_error(error_code ec)
{
if (ec == ERR_OK) {
args->cb();
return;
}
operation::on_error(this, op_type::OP_SET_DATA, ec, args->node);
}
};
struct on_get_children : operation
{
struct arguments
{
std::function<void(bool, const std::vector<std::string> &)> cb;
std::string node;
};
std::shared_ptr<arguments> args;
void run()
{
remote_storage()->get_children(
args->node,
LPC_META_STATE_HIGH,
[op = *this](error_code ec, const std::vector<std::string> &children) mutable {
op.on_error(ec, children);
},
tracker());
}
void on_error(error_code ec, const std::vector<std::string> &children)
{
if (ec == ERR_OK) {
args->cb(true, children);
return;
}
if (ec == ERR_OBJECT_NOT_FOUND) {
args->cb(false, children);
return;
}
operation::on_error(this, op_type::OP_GET_CHILDREN, ec, args->node);
}
};
} // namespace mss
} // namespace replication
} // namespace dsn