blob: 06f2a2b3d54b653f6954b4961fef08fbb59147f6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "manager/stmgrstate.h"
#include <iostream>
#include <string>
#include <vector>
#include "manager/tmanagerserver.h"
#include "proto/messages.h"
#include "basics/basics.h"
#include "errors/errors.h"
#include "threads/threads.h"
#include "network/network.h"
#include "config/heron-internals-config-reader.h"
namespace heron {
namespace tmanager {
StMgrState::StMgrState(Connection* _conn, const proto::system::StMgr& _stmgr,
const std::vector<shared_ptr<proto::system::Instance>>& _instances,
Server& _server) : server_(_server) {
last_heartbeat_ = time(NULL);
last_stats_ = NULL;
instances_ = _instances;
stmgr_ = std::make_shared<proto::system::StMgr>(_stmgr);
connection_ = _conn;
}
StMgrState::~StMgrState() {
delete last_stats_;
}
void StMgrState::UpdateWithNewStMgr(const proto::system::StMgr& _stmgr,
const std::vector<shared_ptr<proto::system::Instance>>& _instances,
Connection* _conn) {
delete last_stats_;
last_stats_ = NULL;
stmgr_ = std::make_shared<proto::system::StMgr>(_stmgr);
instances_ = _instances;
connection_ = _conn;
}
bool StMgrState::VerifyInstances(const std::vector<proto::system::Instance*>& _instances) {
if (instances_.size() != _instances.size()) return false;
for (size_t i = 0; i < instances_.size(); ++i) {
bool found = false;
for (size_t j = 0; j < _instances.size(); ++j) {
if (instances_[i]->instance_id() != _instances[j]->instance_id()) continue;
if (instances_[i]->stmgr_id() != _instances[j]->stmgr_id()) continue;
if (instances_[i]->info().task_id() != _instances[j]->info().task_id()) continue;
if (instances_[i]->info().component_index() != _instances[j]->info().component_index())
continue;
if (instances_[i]->info().component_name() != _instances[j]->info().component_name())
continue;
found = true;
break;
}
if (!found) return false;
}
return true;
}
void StMgrState::heartbeat(sp_int64, proto::system::StMgrStats* _stats) {
// Right now we ignore the time supplied by the stmgr.
// TODO(kramasamy): Figure out the right way here
last_heartbeat_ = time(NULL);
delete last_stats_;
last_stats_ = _stats;
}
void StMgrState::SendRestoreTopologyStateMessage(
const proto::ckptmgr::RestoreTopologyStateRequest& _message) {
LOG(INFO) << "Sending restore topology state message to stmgr " << stmgr_->id()
<< " with checkpoint " << _message.checkpoint_id();
server_.SendMessage(connection_, _message);
}
void StMgrState::SendStartStatefulProcessingMessage(const std::string& _checkpoint_id) {
LOG(INFO) << "Sending Start Stateful Processing message to stmgr " << stmgr_->id()
<< " with checkpoint " << _checkpoint_id;
proto::ckptmgr::StartStmgrStatefulProcessing message;
message.set_checkpoint_id(_checkpoint_id);
server_.SendMessage(connection_, message);
}
void StMgrState::NewPhysicalPlan(const proto::system::PhysicalPlan& _pplan) {
LOG(INFO) << "Sending a new physical plan to stmgr " << stmgr_->id();
proto::stmgr::NewPhysicalPlanMessage message;
message.mutable_new_pplan()->CopyFrom(_pplan);
server_.SendMessage(connection_, message);
}
void StMgrState::NewStatefulCheckpoint(const proto::ckptmgr::StartStatefulCheckpoint& _request) {
LOG(INFO) << "Sending a new stateful checkpoint request to stmgr: " << stmgr_->id();
server_.SendMessage(connection_, _request);
}
void StMgrState::SendCheckpointSavedMessage(
const proto::ckptmgr::StatefulConsistentCheckpointSaved &_msg) {
LOG(INFO) << "Sending checkpoint saved message to stmgr: " << stmgr_->id() << " "
<< "for checkpoint: " << _msg.consistent_checkpoint().checkpoint_id();
server_.SendMessage(connection_, _msg);
}
/*
void
StMgrState::AddAssignment(const std::vector<pair<string, sp_int32> >& _assignments,
proto::system::Assignment* _assignment)
{
// A vector of <component_id, partition_id is given to us.
CHECK(_assignments.size() == workers_.size());
proto::system::NodeManagerAssignment* val = _assignment->add_assignments();
val->set_nodemgr_id(info_->nodemgr_id());
set<string>::iterator iter = workers_.begin();
for (sp_uint32 i = 0; i < _assignments.size(); ++i) {
proto::system::WorkerAssignment* wrkr = val->add_workers();
wrkr->set_worker_id(*iter);
wrkr->set_comp_id(_assignments[i].first);
wrkr->set_instance_id(_assignments[i].second);
++iter;
}
}
*/
bool StMgrState::TimedOut() const {
sp_int32 timeout =
config::HeronInternalsConfigReader::Instance()->GetHeronTmanagerStmgrStateTimeoutSec();
return (time(NULL) - last_heartbeat_) > timeout;
}
} // namespace tmanager
} // namespace heron