blob: 4ad2977367ce418d116787a88f3963a21dadbe5f [file] [log] [blame]
/*
* Copyright 2015 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "manager/ckptmgr-client.h"
#include <iostream>
#include <string>
#include "errors/errors.h"
#include "threads/threads.h"
#include "config/heron-internals-config-reader.h"
namespace heron {
namespace stmgr {
CkptMgrClient::CkptMgrClient(EventLoop* eventloop, const NetworkOptions& _options,
const sp_string& _topology_name, const sp_string& _topology_id,
const sp_string& _ckptmgr_id, const sp_string& _stmgr_id,
std::function<void(const proto::system::Instance&,
const std::string&)> _ckpt_saved_watcher,
std::function<void(proto::system::StatusCode, sp_int32, sp_string,
const proto::ckptmgr::InstanceStateCheckpoint&)> _ckpt_get_watcher,
std::function<void()> _register_watcher)
: Client(eventloop, _options),
topology_name_(_topology_name),
topology_id_(_topology_id),
ckptmgr_id_(_ckptmgr_id),
stmgr_id_(_stmgr_id),
quit_(false),
ckpt_saved_watcher_(_ckpt_saved_watcher),
ckpt_get_watcher_(_ckpt_get_watcher),
register_watcher_(_register_watcher) {
// TODO(nlu): take the value from config
reconnect_cpktmgr_interval_sec_ = 10;
InstallResponseHandler(new proto::ckptmgr::RegisterStMgrRequest(),
&CkptMgrClient::HandleRegisterStMgrResponse);
InstallResponseHandler(new proto::ckptmgr::SaveInstanceStateRequest(),
&CkptMgrClient::HandleSaveInstanceStateResponse);
InstallResponseHandler(new proto::ckptmgr::GetInstanceStateRequest(),
&CkptMgrClient::HandleGetInstanceStateResponse);
}
CkptMgrClient::~CkptMgrClient() {
Stop();
}
void CkptMgrClient::Quit() {
quit_ = true;
Stop();
}
void CkptMgrClient::HandleConnect(NetworkErrorCode _status) {
if (_status == OK) {
LOG(INFO) << "Connected to ckptmgr " << ckptmgr_id_ << " running at "
<< get_clientoptions().get_host() << ":" << get_clientoptions().get_port()
<< std::endl;
if (quit_) {
Stop();
} else {
SendRegisterRequest();
}
} else {
LOG(WARNING) << "Could not connect to cpktmgr" << ckptmgr_id_ << " running at "
<< get_clientoptions().get_host() << ":" << get_clientoptions().get_port()
<< " due to: " << _status << std::endl;
if (quit_) {
LOG(ERROR) << "Quitting";
delete this;
return;
} else {
LOG(INFO) << "Retrying again..." << std::endl;
AddTimer([this]() { this->OnReconnectTimer(); },
reconnect_cpktmgr_interval_sec_ * 1000 * 1000);
}
}
}
void CkptMgrClient::HandleClose(NetworkErrorCode _code) {
if (_code == OK) {
LOG(INFO) << "We closed our server connection with cpktmgr " << ckptmgr_id_ << " running at "
<< get_clientoptions().get_host() << ":" << get_clientoptions().get_port()
<< std::endl;
} else {
LOG(INFO) << "Ckptmgr" << ckptmgr_id_ << " running at "
<< get_clientoptions().get_host() << ":" << get_clientoptions().get_port()
<< " closed connection with code " << _code << std::endl;
}
if (quit_) {
delete this;
} else {
LOG(INFO) << "Will try to reconnect again..." << std::endl;
AddTimer([this]() { this->OnReconnectTimer(); },
reconnect_cpktmgr_interval_sec_ * 1000 * 1000);
}
}
void CkptMgrClient::HandleRegisterStMgrResponse(void*,
proto::ckptmgr::RegisterStMgrResponse* _response,
NetworkErrorCode _status) {
if (_status != OK) {
LOG(ERROR) << "NonOK network code " << _status << " for register response from ckptmgr "
<< ckptmgr_id_ << "running at " << get_clientoptions().get_host() << ":"
<< get_clientoptions().get_port();
delete _response;
Stop();
return;
}
proto::system::StatusCode status = _response->status().status();
if (status != proto::system::OK) {
LOG(ERROR) << "NonOK register response " << status << " from ckptmgr " << ckptmgr_id_
<< " running at " << get_clientoptions().get_host() << ":"
<< get_clientoptions().get_port();
Stop();
} else {
LOG(INFO) << "Register request got response OK from ckptmgr " << ckptmgr_id_
<< " running at " << get_clientoptions().get_host() << ":"
<< get_clientoptions().get_port();
register_watcher_();
}
delete _response;
}
void CkptMgrClient::OnReconnectTimer() { Start(); }
void CkptMgrClient::SendRegisterRequest() {
auto request = new proto::ckptmgr::RegisterStMgrRequest();
request->set_topology_name(topology_name_);
request->set_topology_id(topology_id_);
request->set_stmgr_id(stmgr_id_);
SendRequest(request, NULL);
}
void CkptMgrClient::SaveInstanceState(proto::ckptmgr::SaveInstanceStateRequest* _request) {
LOG(INFO) << "Sending SaveInstanceState to ckptmgr" << std::endl;
SendRequest(_request, NULL);
}
void CkptMgrClient::GetInstanceState(const proto::system::Instance& _instance,
const std::string& _checkpoint_id) {
LOG(INFO) << "Sending GetInstanceState to ckptmgr for task_id " << _instance.info().task_id()
<< " and checkpoint_id " << _checkpoint_id;
int32_t* nattempts = new int32_t;
*nattempts = 0;
return GetInstanceState(_instance, _checkpoint_id, nattempts);
}
void CkptMgrClient::GetInstanceState(const proto::system::Instance& _instance,
const std::string& _checkpoint_id,
int32_t* _nattempts) {
auto request = new proto::ckptmgr::GetInstanceStateRequest();
request->mutable_instance()->CopyFrom(_instance);
request->set_checkpoint_id(_checkpoint_id);
SendRequest(request, _nattempts);
}
void CkptMgrClient::HandleSaveInstanceStateResponse(void*,
proto::ckptmgr::SaveInstanceStateResponse* _response,
NetworkErrorCode _status) {
if (_status != OK) {
LOG(ERROR) << "NonOK response message for SaveInstanceStateResponse";
delete _response;
Stop();
return;
}
if (_response->status().status() != proto::system::OK) {
LOG(ERROR) << "CkptMgr could not save " << _response->status().status();
delete _response;
return;
}
ckpt_saved_watcher_(_response->instance(), _response->checkpoint_id());
delete _response;
}
void CkptMgrClient::HandleGetInstanceStateResponse(void* _ctx,
proto::ckptmgr::GetInstanceStateResponse* _response,
NetworkErrorCode _status) {
int32_t* nattempts = static_cast<int32_t*>(_ctx);
if (_status != OK) {
LOG(ERROR) << "NonOK response message for GetInstanceStateResponse";
delete _response;
delete nattempts;
Stop();
return;
}
if (_response->status().status() != proto::system::OK) {
LOG(ERROR) << "CkptMgr could not get checkpoint for "
<< _response->instance().info().task_id()
<< " and checkpoint_id " << _response->checkpoint_id()
<< " because of reason: " << _response->status().status();
*nattempts = *nattempts + 1;
if (*nattempts >= 5) {
LOG(ERROR) << "Not Retrying because already tried too many times";
delete nattempts;
ckpt_get_watcher_(_response->status().status(),
_response->instance().info().task_id(),
_response->checkpoint_id(), _response->checkpoint());
} else {
LOG(INFO) << "Retrying...";
GetInstanceState(_response->instance(), _response->checkpoint_id(), nattempts);
}
} else {
ckpt_get_watcher_(_response->status().status(),
_response->instance().info().task_id(), _response->checkpoint_id(),
_response->checkpoint());
}
delete _response;
}
} // namespace stmgr
} // namespace heron