blob: f0edf6b41344af3203992c95a4d82f3234b9700c [file] [log] [blame]
/*
* Copyright 2015 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "statemgr/heron-localfilestatemgr.h"
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <iostream>
#include <fstream>
#include <string>
#include <vector>
#include "proto/messages.h"
#include "basics/basics.h"
#include "errors/errors.h"
#include "threads/threads.h"
#include "network/network.h"
namespace heron {
namespace common {
HeronLocalFileStateMgr::HeronLocalFileStateMgr(const std::string& _topleveldir,
EventLoop* eventLoop)
: HeronStateMgr(_topleveldir), eventLoop_(eventLoop) {
InitTree();
}
HeronLocalFileStateMgr::~HeronLocalFileStateMgr() {
// nothing really
}
void HeronLocalFileStateMgr::InitTree() {
sp_string dpath = GetTopLevelDir();
sp_string path = dpath;
path += "/topologies";
FileUtils::makeDirectory(path);
path = dpath;
path += "/tmasters";
FileUtils::makeDirectory(path);
path = dpath;
path += "/pplans";
FileUtils::makeDirectory(path);
path = dpath;
path += "/packingplans";
FileUtils::makeDirectory(path);
path = dpath;
path += "/executionstate";
FileUtils::makeDirectory(path);
path = dpath;
path += "/statefulcheckpoints";
FileUtils::makeDirectory(path);
}
void HeronLocalFileStateMgr::SetTMasterLocationWatch(const std::string& topology_name,
VCallback<> watcher) {
CHECK(watcher);
// We kind of cheat here. We check periodically
time_t tmaster_last_change = FileUtils::getModifiedTime(GetTMasterLocationPath(topology_name));
auto cb = [topology_name, tmaster_last_change, watcher, this](EventLoop::Status status) {
this->CheckTMasterLocation(topology_name, tmaster_last_change, std::move(watcher), status);
};
CHECK_GT(eventLoop_->registerTimer(std::move(cb), false, 1000000), 0);
}
void HeronLocalFileStateMgr::SetMetricsCacheLocationWatch(const std::string& topology_name,
VCallback<> watcher) {
CHECK(watcher);
// We kind of cheat here. We check periodically
time_t tmaster_last_change = FileUtils::getModifiedTime(
GetMetricsCacheLocationPath(topology_name));
auto cb = [topology_name, tmaster_last_change, watcher, this](EventLoop::Status status) {
this->CheckMetricsCacheLocation(topology_name, tmaster_last_change, std::move(watcher), status);
};
CHECK_GT(eventLoop_->registerTimer(std::move(cb), false, 1000000), 0);
}
void HeronLocalFileStateMgr::SetPackingPlanWatch(const std::string& topology_name,
VCallback<> watcher) {
CHECK(watcher);
// We kind of cheat here. We check periodically
time_t packingplan_last_change = FileUtils::getModifiedTime(GetPackingPlanPath(topology_name));
auto cb = [topology_name, packingplan_last_change, watcher, this](EventLoop::Status status) {
this->CheckPackingPlan(topology_name, packingplan_last_change, std::move(watcher), status);
};
CHECK_GT(eventLoop_->registerTimer(std::move(cb), false, 1000000), 0);
}
void HeronLocalFileStateMgr::GetTMasterLocation(const std::string& _topology_name,
proto::tmaster::TMasterLocation* _return,
VCallback<proto::system::StatusCode> cb) {
std::string contents;
proto::system::StatusCode status =
ReadAllFileContents(GetTMasterLocationPath(_topology_name), contents);
if (status == proto::system::OK) {
if (!_return->ParseFromString(contents)) {
status = proto::system::STATE_CORRUPTED;
}
}
auto wCb = [cb, status](EventLoop::Status) { cb(status); };
CHECK_GT(eventLoop_->registerTimer(std::move(wCb), false, 0), 0);
}
void HeronLocalFileStateMgr::GetMetricsCacheLocation(const std::string& _topology_name,
proto::tmaster::MetricsCacheLocation* _return,
VCallback<proto::system::StatusCode> cb) {
std::string contents;
proto::system::StatusCode status =
ReadAllFileContents(GetMetricsCacheLocationPath(_topology_name), contents);
if (status == proto::system::OK) {
if (!_return->ParseFromString(contents)) {
status = proto::system::STATE_CORRUPTED;
}
}
auto wCb = [cb, status](EventLoop::Status) { cb(status); };
CHECK_GT(eventLoop_->registerTimer(std::move(wCb), false, 0), 0);
}
void HeronLocalFileStateMgr::SetTMasterLocation(const proto::tmaster::TMasterLocation& _location,
VCallback<proto::system::StatusCode> cb) {
// Note: Unlike Zk statemgr, we overwrite the location even if there is already one.
// This is because when running in simulator we control when a tmaster dies and
// comes up deterministically.
std::string fname = GetTMasterLocationPath(_location.topology_name());
std::string contents;
_location.SerializeToString(&contents);
proto::system::StatusCode status = WriteToFile(fname, contents);
auto wCb = [cb, status](EventLoop::Status) { cb(status); };
CHECK_GT(eventLoop_->registerTimer(std::move(wCb), false, 0), 0);
}
void HeronLocalFileStateMgr::SetMetricsCacheLocation(
const proto::tmaster::MetricsCacheLocation& _location,
VCallback<proto::system::StatusCode> cb) {
// Note: Unlike Zk statemgr, we overwrite the location even if there is already one.
// This is because when running in simulator we control when a tmaster dies and
// comes up deterministically.
std::string fname = GetMetricsCacheLocationPath(_location.topology_name());
std::string contents;
_location.SerializeToString(&contents);
proto::system::StatusCode status = WriteToFile(fname, contents);
auto wCb = [cb, status](EventLoop::Status) { cb(status); };
CHECK_GT(eventLoop_->registerTimer(std::move(wCb), false, 0), 0);
}
void HeronLocalFileStateMgr::CreateTopology(const proto::api::Topology& _topology,
VCallback<proto::system::StatusCode> cb) {
std::string fname = GetTopologyPath(_topology.name());
// First check to see if location exists.
if (MakeSureFileDoesNotExist(fname) != proto::system::OK) {
auto wCb = [cb](EventLoop::Status) { cb(proto::system::PATH_ALREADY_EXISTS); };
CHECK_GT(eventLoop_->registerTimer(std::move(wCb), false, 0), 0);
return;
}
std::string contents;
_topology.SerializeToString(&contents);
proto::system::StatusCode status = WriteToFile(fname, contents);
auto wCb = [cb, status](EventLoop::Status) { cb(status); };
CHECK_GT(eventLoop_->registerTimer(std::move(wCb), false, 0), 0);
}
void HeronLocalFileStateMgr::DeleteTopology(const sp_string& _topology_name,
VCallback<proto::system::StatusCode> cb) {
proto::system::StatusCode status = DeleteFile(GetTopologyPath(_topology_name));
auto wCb = [cb, status](EventLoop::Status) { cb(status); };
CHECK_GT(eventLoop_->registerTimer(std::move(wCb), false, 0), 0);
}
void HeronLocalFileStateMgr::SetTopology(const proto::api::Topology& _topology,
VCallback<proto::system::StatusCode> cb) {
std::string fname = GetTopologyPath(_topology.name());
std::string contents;
_topology.SerializeToString(&contents);
proto::system::StatusCode status = WriteToFile(fname, contents);
auto wCb = [cb, status](EventLoop::Status) { cb(status); };
CHECK_GT(eventLoop_->registerTimer(std::move(wCb), false, 0), 0);
}
void HeronLocalFileStateMgr::GetTopology(const std::string& _topology_name,
proto::api::Topology* _return,
VCallback<proto::system::StatusCode> cb) {
std::string contents;
proto::system::StatusCode status = ReadAllFileContents(GetTopologyPath(_topology_name), contents);
if (status == proto::system::OK) {
if (!_return->ParseFromString(contents)) {
status = proto::system::STATE_CORRUPTED;
}
}
auto wCb = [cb, status](EventLoop::Status) { cb(status); };
CHECK_GT(eventLoop_->registerTimer(std::move(wCb), false, 0), 0);
}
void HeronLocalFileStateMgr::CreatePhysicalPlan(const proto::system::PhysicalPlan& _pplan,
VCallback<proto::system::StatusCode> cb) {
std::string fname = GetPhysicalPlanPath(_pplan.topology().name());
// First check to see if location exists.
if (MakeSureFileDoesNotExist(fname) != proto::system::OK) {
auto wCb = [cb](EventLoop::Status) { cb(proto::system::PATH_ALREADY_EXISTS); };
CHECK_GT(eventLoop_->registerTimer(std::move(wCb), false, 0), 0);
return;
}
std::string contents;
_pplan.SerializeToString(&contents);
proto::system::StatusCode status = WriteToFile(fname, contents);
auto wCb = [cb, status](EventLoop::Status) { cb(status); };
CHECK_GT(eventLoop_->registerTimer(std::move(wCb), false, 0), 0);
}
void HeronLocalFileStateMgr::DeletePhysicalPlan(const sp_string& _topology_name,
VCallback<proto::system::StatusCode> cb) {
proto::system::StatusCode status = DeleteFile(GetPhysicalPlanPath(_topology_name));
auto wCb = [cb, status](EventLoop::Status) { cb(status); };
CHECK_GT(eventLoop_->registerTimer(std::move(wCb), false, 0), 0);
}
void HeronLocalFileStateMgr::SetPhysicalPlan(const proto::system::PhysicalPlan& _pplan,
VCallback<proto::system::StatusCode> cb) {
std::string contents;
_pplan.SerializeToString(&contents);
proto::system::StatusCode status =
WriteToFile(GetPhysicalPlanPath(_pplan.topology().name()), contents);
auto wCb = [cb, status](EventLoop::Status) { cb(status); };
CHECK_GT(eventLoop_->registerTimer(std::move(wCb), false, 0), 0);
}
void HeronLocalFileStateMgr::GetPhysicalPlan(const std::string& _topology_name,
proto::system::PhysicalPlan* _return,
VCallback<proto::system::StatusCode> cb) {
std::string contents;
proto::system::StatusCode status =
ReadAllFileContents(GetPhysicalPlanPath(_topology_name), contents);
if (status == proto::system::OK) {
if (!_return->ParseFromString(contents)) {
status = proto::system::STATE_CORRUPTED;
}
}
auto wCb = [cb, status](EventLoop::Status) { cb(status); };
CHECK_GT(eventLoop_->registerTimer(std::move(wCb), false, 0), 0);
}
void HeronLocalFileStateMgr::CreatePackingPlan(const std::string& _topology_name,
const proto::system::PackingPlan& _packingPlan,
VCallback<proto::system::StatusCode> _cb) {
std::string fname = GetPackingPlanPath(_topology_name);
std::string contents;
_packingPlan.SerializeToString(&contents);
WriteToFile(fname, contents);
}
void HeronLocalFileStateMgr::GetPackingPlan(const std::string& _topology_name,
proto::system::PackingPlan* _return,
VCallback<proto::system::StatusCode> cb) {
std::string contents;
proto::system::StatusCode status =
ReadAllFileContents(GetPackingPlanPath(_topology_name), contents);
if (status == proto::system::OK) {
if (!_return->ParseFromString(contents)) {
status = proto::system::STATE_CORRUPTED;
}
}
auto wCb = [cb, status](EventLoop::Status) { cb(status); };
CHECK_GT(eventLoop_->registerTimer(std::move(wCb), false, 0), 0);
}
void HeronLocalFileStateMgr::CreateExecutionState(const proto::system::ExecutionState& _st,
VCallback<proto::system::StatusCode> cb) {
std::string fname = GetExecutionStatePath(_st.topology_name());
// First check to see if location exists.
if (MakeSureFileDoesNotExist(fname) != proto::system::OK) {
auto wCb = [cb](EventLoop::Status) { cb(proto::system::PATH_ALREADY_EXISTS); };
CHECK_GT(eventLoop_->registerTimer(std::move(wCb), false, 0), 0);
return;
}
std::string contents;
_st.SerializeToString(&contents);
proto::system::StatusCode status = WriteToFile(fname, contents);
auto wCb = [cb, status](EventLoop::Status) { cb(status); };
CHECK_GT(eventLoop_->registerTimer(std::move(wCb), false, 0), 0);
}
void HeronLocalFileStateMgr::DeleteExecutionState(const std::string& _topology_name,
VCallback<proto::system::StatusCode> cb) {
proto::system::StatusCode status = DeleteFile(GetExecutionStatePath(_topology_name));
auto wCb = [cb, status](EventLoop::Status) { cb(status); };
CHECK_GT(eventLoop_->registerTimer(std::move(wCb), false, 0), 0);
}
void HeronLocalFileStateMgr::GetExecutionState(const std::string& _topology_name,
proto::system::ExecutionState* _return,
VCallback<proto::system::StatusCode> cb) {
std::string contents;
proto::system::StatusCode status =
ReadAllFileContents(GetExecutionStatePath(_topology_name), contents);
if (status == proto::system::OK) {
if (!_return->ParseFromString(contents)) {
status = proto::system::STATE_CORRUPTED;
}
}
auto wCb = [cb, status](EventLoop::Status) { cb(status); };
CHECK_GT(eventLoop_->registerTimer(std::move(wCb), false, 0), 0);
}
void HeronLocalFileStateMgr::SetExecutionState(const proto::system::ExecutionState& _st,
VCallback<proto::system::StatusCode> cb) {
std::string fname = GetExecutionStatePath(_st.topology_name());
std::string contents;
_st.SerializeToString(&contents);
proto::system::StatusCode status = WriteToFile(fname, contents);
auto wCb = [cb, status](EventLoop::Status) { cb(status); };
CHECK_GT(eventLoop_->registerTimer(std::move(wCb), false, 0), 0);
}
void HeronLocalFileStateMgr::CreateStatefulCheckpoints(const std::string& _topology_name,
const proto::ckptmgr::StatefulConsistentCheckpoints& _ckpt,
VCallback<proto::system::StatusCode> cb) {
std::string fname = GetStatefulCheckpointsPath(_topology_name);
// First check to see if location exists.
if (MakeSureFileDoesNotExist(fname) != proto::system::OK) {
auto wCb = [cb](EventLoop::Status) { cb(proto::system::PATH_ALREADY_EXISTS); };
CHECK_GT(eventLoop_->registerTimer(std::move(wCb), false, 0), 0);
return;
}
std::string contents;
_ckpt.SerializeToString(&contents);
proto::system::StatusCode status = WriteToFile(fname, contents);
auto wCb = [cb, status](EventLoop::Status) { cb(status); };
CHECK_GT(eventLoop_->registerTimer(std::move(wCb), false, 0), 0);
}
void HeronLocalFileStateMgr::DeleteStatefulCheckpoints(const std::string& _topology_name,
VCallback<proto::system::StatusCode> cb) {
proto::system::StatusCode status = DeleteFile(GetStatefulCheckpointsPath(_topology_name));
auto wCb = [cb, status](EventLoop::Status) { cb(status); };
CHECK_GT(eventLoop_->registerTimer(std::move(wCb), false, 0), 0);
}
void HeronLocalFileStateMgr::SetStatefulCheckpoints(const std::string& _topology_name,
const proto::ckptmgr::StatefulConsistentCheckpoints& _ckpt,
VCallback<proto::system::StatusCode> cb) {
std::string contents;
_ckpt.SerializeToString(&contents);
proto::system::StatusCode status =
WriteToFile(GetStatefulCheckpointsPath(_topology_name), contents);
auto wCb = [cb, status](EventLoop::Status) { cb(status); };
CHECK_GT(eventLoop_->registerTimer(std::move(wCb), false, 0), 0);
}
void HeronLocalFileStateMgr::GetStatefulCheckpoints(const std::string& _topology_name,
proto::ckptmgr::StatefulConsistentCheckpoints* _return,
VCallback<proto::system::StatusCode> cb) {
std::string contents;
proto::system::StatusCode status =
ReadAllFileContents(GetStatefulCheckpointsPath(_topology_name), contents);
if (status == proto::system::OK) {
if (!_return->ParseFromString(contents)) {
status = proto::system::STATE_CORRUPTED;
}
}
auto wCb = [cb, status](EventLoop::Status) { cb(status); };
CHECK_GT(eventLoop_->registerTimer(std::move(wCb), false, 0), 0);
}
void HeronLocalFileStateMgr::ListExecutionStateTopologies(std::vector<sp_string>* _return,
VCallback<proto::system::StatusCode> cb) {
proto::system::StatusCode status = proto::system::OK;
if (FileUtils::listFiles(GetExecutionStateDir(), *_return) != 0) {
status = proto::system::NOTOK;
}
auto wCb = [cb, status](EventLoop::Status) { cb(status); };
CHECK_GT(eventLoop_->registerTimer(std::move(wCb), false, 0), 0);
}
void HeronLocalFileStateMgr::ListTopologies(std::vector<sp_string>* _return,
VCallback<proto::system::StatusCode> cb) {
proto::system::StatusCode status = proto::system::OK;
if (FileUtils::listFiles(GetTopologyDir(), *_return) != 0) {
status = proto::system::NOTOK;
}
auto wCb = [cb, status](EventLoop::Status) { cb(status); };
CHECK_GT(eventLoop_->registerTimer(std::move(wCb), false, 0), 0);
}
proto::system::StatusCode HeronLocalFileStateMgr::ReadAllFileContents(const std::string& _filename,
std::string& _contents) {
std::ifstream in(_filename.c_str(), std::ios::in | std::ios::binary);
if (in) {
in.seekg(0, std::ios::end);
_contents.resize(in.tellg());
in.seekg(0, std::ios::beg);
in.read(&_contents[0], _contents.size());
in.close();
return proto::system::OK;
} else {
// We could not open the file
LOG(ERROR) << "Error reading from " << _filename << " with errno " << errno << "\n";
return proto::system::PATH_DOES_NOT_EXIST;
}
}
proto::system::StatusCode HeronLocalFileStateMgr::WriteToFile(const std::string& _filename,
const std::string& _contents) {
const std::string tmp_filename = _filename + ".tmp";
::unlink(tmp_filename.c_str());
std::ofstream ot(tmp_filename.c_str(), std::ios::out | std::ios::binary);
if (ot) {
ot << _contents;
ot.close();
if (rename(tmp_filename.c_str(), _filename.c_str()) != 0) {
LOG(ERROR) << "Rename failed from " << tmp_filename << " to " << _filename << "\n";
return proto::system::STATE_WRITE_ERROR;
} else {
return proto::system::OK;
}
} else {
LOG(ERROR) << "Error writing to " << _filename << " with errno " << errno << std::endl;
return proto::system::STATE_WRITE_ERROR;
}
}
proto::system::StatusCode HeronLocalFileStateMgr::DeleteFile(const std::string& _filename) {
if (remove(_filename.c_str()) != 0) {
return proto::system::NOTOK;
} else {
return proto::system::OK;
}
}
proto::system::StatusCode HeronLocalFileStateMgr::MakeSureFileDoesNotExist(
const std::string& _filename) {
std::ifstream in(_filename.c_str(), std::ios::in | std::ios::binary);
if (in) {
// it already exists.
in.close();
return proto::system::PATH_ALREADY_EXISTS;
} else {
return proto::system::OK;
}
}
void HeronLocalFileStateMgr::CheckTMasterLocation(std::string topology_name, time_t last_change,
VCallback<> watcher, EventLoop::Status) {
time_t nlast_change = FileUtils::getModifiedTime(GetTMasterLocationPath(topology_name));
if (nlast_change > last_change) {
watcher();
} else {
nlast_change = last_change;
}
auto cb = [topology_name, nlast_change, watcher, this](EventLoop::Status status) {
this->CheckTMasterLocation(topology_name, nlast_change, std::move(watcher), status);
};
CHECK_GT(eventLoop_->registerTimer(std::move(cb), false, 1000000), 0);
}
void HeronLocalFileStateMgr::CheckMetricsCacheLocation(
std::string topology_name, time_t last_change,
VCallback<> watcher, EventLoop::Status) {
time_t nlast_change = FileUtils::getModifiedTime(GetMetricsCacheLocationPath(topology_name));
if (nlast_change > last_change) {
watcher();
} else {
nlast_change = last_change;
}
auto cb = [topology_name, nlast_change, watcher, this](EventLoop::Status status) {
this->CheckMetricsCacheLocation(topology_name, nlast_change, std::move(watcher), status);
};
CHECK_GT(eventLoop_->registerTimer(std::move(cb), false, 1000000), 0);
}
void HeronLocalFileStateMgr::CheckPackingPlan(std::string topology_name, time_t last_change,
VCallback<> watcher, EventLoop::Status) {
time_t nlast_change = FileUtils::getModifiedTime(GetPackingPlanPath(topology_name));
if (nlast_change > last_change) {
watcher();
} else {
nlast_change = last_change;
}
auto cb = [topology_name, nlast_change, watcher, this](EventLoop::Status status) {
this->CheckPackingPlan(topology_name, nlast_change, std::move(watcher), status);
};
CHECK_GT(eventLoop_->registerTimer(std::move(cb), false, 1000000), 0);
}
} // namespace common
} // namespace heron