blob: 6d57ef4bd0edfe038fde727697bc07f14dc80c01 [file] [log] [blame]
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Microsoft Corporation
*
* -=- Robust Distributed System Nucleus (rDSN) -=-
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include <zookeeper/zookeeper.h>
#include <functional>
#include <memory>
#include <type_traits>
#include <utility>
#include "distributed_lock_service_zookeeper.h"
#include "lock_struct.h"
#include "lock_types.h"
#include "runtime/service_app.h"
#include "runtime/task/async_calls.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/strings.h"
#include "zookeeper/zookeeper_session_mgr.h"
#include "zookeeper_error.h"
#include "zookeeper_session.h"
namespace dsn {
namespace dist {
DSN_DECLARE_int32(timeout_ms);
std::string distributed_lock_service_zookeeper::LOCK_NODE_PREFIX = "LOCKNODE";
distributed_lock_service_zookeeper::distributed_lock_service_zookeeper() : ref_counter()
{
_first_call = true;
}
distributed_lock_service_zookeeper::~distributed_lock_service_zookeeper()
{
if (_session) {
std::vector<lock_struct_ptr> handle_vec;
{
utils::auto_write_lock l(_service_lock);
for (auto &kv : _zookeeper_locks)
handle_vec.push_back(kv.second);
_zookeeper_locks.clear();
}
for (lock_struct_ptr &ptr : handle_vec)
_session->detach(ptr.get());
_session->detach(this);
_session = nullptr;
}
}
error_code distributed_lock_service_zookeeper::finalize()
{
release_ref();
return ERR_OK;
}
void distributed_lock_service_zookeeper::erase(const lock_key &key)
{
utils::auto_write_lock l(_service_lock);
_zookeeper_locks.erase(key);
}
error_code distributed_lock_service_zookeeper::initialize(const std::vector<std::string> &args)
{
if (args.empty()) {
LOG_ERROR("need parameters: <lock_root>");
return ERR_INVALID_PARAMETERS;
}
const char *lock_root = args[0].c_str();
_session =
zookeeper_session_mgr::instance().get_session(service_app::current_service_app_info());
_zoo_state = _session->attach(this,
std::bind(&distributed_lock_service_zookeeper::on_zoo_session_evt,
lock_srv_ptr(this),
std::placeholders::_1));
if (_zoo_state != ZOO_CONNECTED_STATE) {
_waiting_attach.wait_for(FLAGS_timeout_ms);
if (_zoo_state != ZOO_CONNECTED_STATE) {
LOG_WARNING(
"attach to zookeeper session timeout, distributed lock service initialized failed");
return ERR_TIMEOUT;
}
}
std::vector<std::string> slices;
utils::split_args(lock_root, slices, '/');
std::string current = "";
for (auto &str : slices) {
utils::notify_event e;
int zerr;
current = current + "/" + str;
zookeeper_session::zoo_opcontext *op = zookeeper_session::create_context();
op->_optype = zookeeper_session::ZOO_CREATE;
op->_input._path = current;
op->_callback_function = [&e, &zerr](zookeeper_session::zoo_opcontext *op) mutable {
zerr = op->_output.error;
e.notify();
};
_session->visit(op);
e.wait();
if (zerr != ZOK && zerr != ZNODEEXISTS) {
LOG_ERROR("create zk node failed, path = {}, err = {}", current, zerror(zerr));
return from_zerror(zerr);
}
}
_lock_root = current.empty() ? "/" : current;
LOG_INFO("init distributed_lock_service_zookeeper succeed, lock_root = {}", _lock_root);
// Notice: this reference is released in the finalize
add_ref();
return ERR_OK;
}
std::pair<task_ptr, task_ptr>
distributed_lock_service_zookeeper::lock(const std::string &lock_id,
const std::string &myself_id,
task_code lock_cb_code,
const lock_callback &lock_cb,
task_code lease_expire_code,
const lock_callback &lease_expire_callback,
const lock_options &opt)
{
lock_struct_ptr handle;
{
utils::auto_write_lock l(_service_lock);
auto id_pair = std::make_pair(lock_id, myself_id);
auto iter = _zookeeper_locks.find(id_pair);
if (iter == _zookeeper_locks.end()) {
if (!opt.create_if_not_exist) {
task_ptr tsk = tasking::enqueue(
lock_cb_code, nullptr, std::bind(lock_cb, ERR_OBJECT_NOT_FOUND, "", -1));
return std::make_pair(tsk, nullptr);
} else {
handle = new lock_struct(lock_srv_ptr(this));
handle->initialize(lock_id, myself_id);
_zookeeper_locks[id_pair] = handle;
}
} else
handle = iter->second;
}
lock_future_ptr lock_tsk(new lock_future(lock_cb_code, lock_cb, 0));
lock_future_ptr expire_tsk(new lock_future(lease_expire_code, lease_expire_callback, 0));
tasking::enqueue(TASK_CODE_DLOCK,
nullptr,
std::bind(&lock_struct::try_lock, handle, lock_tsk, expire_tsk),
handle->hash());
return std::make_pair(lock_tsk, expire_tsk);
}
task_ptr distributed_lock_service_zookeeper::unlock(const std::string &lock_id,
const std::string &myself_id,
bool destroy,
task_code cb_code,
const err_callback &cb)
{
lock_struct_ptr handle;
{
utils::auto_read_lock l(_service_lock);
auto iter = _zookeeper_locks.find(std::make_pair(lock_id, myself_id));
if (iter == _zookeeper_locks.end())
return tasking::enqueue(cb_code, nullptr, std::bind(cb, ERR_OBJECT_NOT_FOUND));
handle = iter->second;
}
error_code_future_ptr unlock_tsk(new error_code_future(cb_code, cb, 0));
tasking::enqueue(TASK_CODE_DLOCK,
nullptr,
std::bind(&lock_struct::unlock, handle, unlock_tsk),
handle->hash());
return unlock_tsk;
}
task_ptr distributed_lock_service_zookeeper::cancel_pending_lock(const std::string &lock_id,
const std::string &myself_id,
task_code cb_code,
const lock_callback &cb)
{
lock_struct_ptr handle;
{
utils::auto_read_lock l(_service_lock);
auto iter = _zookeeper_locks.find(std::make_pair(lock_id, myself_id));
if (iter == _zookeeper_locks.end())
return tasking::enqueue(cb_code, nullptr, std::bind(cb, ERR_OBJECT_NOT_FOUND, "", -1));
handle = iter->second;
}
lock_future_ptr cancel_tsk(new lock_future(cb_code, cb, 0));
tasking::enqueue(TASK_CODE_DLOCK,
nullptr,
std::bind(&lock_struct::cancel_pending_lock, handle, cancel_tsk),
handle->hash());
return cancel_tsk;
}
task_ptr distributed_lock_service_zookeeper::query_lock(const std::string &lock_id,
task_code cb_code,
const lock_callback &cb)
{
std::string owner = "";
uint64_t version = -1;
error_code ec = query_cache(lock_id, owner, version);
return tasking::enqueue(cb_code, nullptr, std::bind(cb, ec, owner, version));
}
error_code distributed_lock_service_zookeeper::query_cache(const std::string &lock_id,
/*out*/ std::string &owner,
/*out*/ uint64_t &version)
{
utils::auto_read_lock l(_service_lock);
auto iter = _lock_cache.find(lock_id);
if (_lock_cache.end() == iter)
return ERR_OBJECT_NOT_FOUND;
else {
owner = iter->second.first;
version = iter->second.second;
return ERR_OK;
}
}
void distributed_lock_service_zookeeper::refresh_lock_cache(const std::string &lock_id,
const std::string &owner,
uint64_t version)
{
utils::auto_write_lock l(_service_lock);
_lock_cache[lock_id] = std::make_pair(owner, version);
}
void distributed_lock_service_zookeeper::dispatch_zookeeper_session_expire()
{
utils::auto_read_lock l(_service_lock);
for (auto &kv : _zookeeper_locks)
tasking::enqueue(TASK_CODE_DLOCK,
nullptr,
std::bind(&lock_struct::lock_expired, kv.second),
kv.second->hash());
}
/*static*/
/* this function runs in zookeeper do-completion thread */
void distributed_lock_service_zookeeper::on_zoo_session_evt(lock_srv_ptr _this, int zoo_state)
{
// TODO: better policy of zookeeper session response
_this->_zoo_state = zoo_state;
if (_this->_first_call && ZOO_CONNECTED_STATE == zoo_state) {
_this->_first_call = false;
_this->_waiting_attach.notify();
return;
}
if (ZOO_EXPIRED_SESSION_STATE == zoo_state || ZOO_AUTH_FAILED_STATE == zoo_state) {
LOG_ERROR("get zoo state: {}, which means the session is expired",
zookeeper_session::string_zoo_state(zoo_state));
_this->dispatch_zookeeper_session_expire();
} else {
LOG_WARNING("get zoo state: {}, ignore it", zookeeper_session::string_zoo_state(zoo_state));
}
}
}
}