blob: f50e20109f637fc54fd875104d1641dc30320f3f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <dlfcn.h>
#include <string>
#include <thread>
#include "glog/logging.h"
#include "slave/slave.h"
#include "proto/messages.h"
#include "network/network.h"
#include "basics/basics.h"
#include "slave/imetrics-registrar-impl.h"
#include "slave/task-context-impl.h"
#include "spoutimpl/spout-instance.h"
#include "boltimpl/bolt-instance.h"
namespace heron {
namespace instance {
Slave::Slave(int myTaskId, const std::string& topologySo)
: myTaskId_(myTaskId), taskContext_(new TaskContextImpl(myTaskId_)),
dataToSlave_(NULL), dataFromSlave_(NULL), metricsFromSlave_(NULL),
instance_(NULL), eventLoop_(std::make_shared<EventLoopImpl>()) {
auto pplan = new proto::system::PhysicalPlan();
pplan_typename_ = pplan->GetTypeName();
delete pplan;
dllHandle_ = dlopen(topologySo.c_str(), RTLD_LAZY);
if (!dllHandle_) {
LOG(FATAL) << "Could not dlopen " << topologySo << " failed with error "
<< dlerror();
}
}
Slave::~Slave() {
if (dlclose(dllHandle_) != 0) {
LOG(FATAL) << "dlclose failed with error " << dlerror();
}
}
void Slave::setCommunicators(
NotifyingCommunicator<pool_unique_ptr<google::protobuf::Message>>* dataToSlave,
NotifyingCommunicator<google::protobuf::Message*>* dataFromSlave,
NotifyingCommunicator<google::protobuf::Message*>* metricsFromSlave) {
dataToSlave_ = dataToSlave;
dataFromSlave_ = dataFromSlave;
metricsFromSlave_ = metricsFromSlave;
std::shared_ptr<api::metric::IMetricsRegistrar> registrar(new IMetricsRegistrarImpl(eventLoop_,
metricsFromSlave));
taskContext_->setMericsRegistrar(registrar);
}
void Slave::Start() {
LOG(INFO) << "Creating slave thread";
slaveThread_.reset(new std::thread(&Slave::InternalStart, this));
}
// This is the one thats running in the slave thread
void Slave::InternalStart() {
LOG(INFO) << "Slave thread started up";
eventLoop_->loop();
}
void Slave::HandleGatewayData(pool_unique_ptr<google::protobuf::Message> msg) {
if (msg->GetTypeName() == pplan_typename_) {
LOG(INFO) << "Slave Received a new pplan message from Gateway";
auto pplan = pool_unique_ptr<proto::system::PhysicalPlan>(
static_cast<proto::system::PhysicalPlan*>(msg.release()));
HandleNewPhysicalPlan(std::move(pplan));
} else {
auto tupleSet = pool_unique_ptr<proto::system::HeronTupleSet2>(
static_cast<proto::system::HeronTupleSet2*>(msg.release()));
HandleStMgrTuples(std::move(tupleSet));
}
}
void Slave::HandleNewPhysicalPlan(pool_unique_ptr<proto::system::PhysicalPlan> pplan) {
std::shared_ptr<proto::system::PhysicalPlan> newPplan = std::move(pplan);
taskContext_->newPhysicalPlan(newPplan);
if (!instance_) {
LOG(INFO) << "Creating the instance for the first time";
if (taskContext_->isSpout()) {
LOG(INFO) << "We are a spout";
instance_ = new SpoutInstance(eventLoop_, taskContext_,
dataFromSlave_, dllHandle_);
} else {
LOG(INFO) << "We are a bolt";
instance_ = new BoltInstance(eventLoop_, taskContext_,
dataToSlave_, dataFromSlave_, dllHandle_);
}
if (newPplan->topology().state() == proto::api::TopologyState::RUNNING) {
LOG(INFO) << "Starting the instance";
instance_->Start();
instance_->DoWork();
} else {
LOG(INFO) << "Not starting the instance";
}
} else {
if (newPplan->topology().state() == proto::api::TopologyState::RUNNING &&
!instance_->IsRunning()) {
instance_->Activate();
} else if (newPplan->topology().state() == proto::api::TopologyState::PAUSED &&
instance_->IsRunning()) {
instance_->Deactivate();
}
}
}
void Slave::HandleStMgrTuples(pool_unique_ptr<proto::system::HeronTupleSet2> tupleSet) {
if (instance_) {
instance_->HandleGatewayTuples(std::move(tupleSet));
} else {
LOG(FATAL) << "Received StMgr tuples before instance was instantiated";
}
}
void Slave::HandleGatewayDataConsumed() {
if (instance_) {
instance_->DoWork();
}
}
} // namespace instance
} // namespace heron