blob: 37cba543639cbb942dabdf206f508b143087e3fa [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <dlfcn.h>
#include <stdlib.h>
#include <list>
#include <string>
#include "boltimpl/bolt-instance.h"
#include "proto/messages.h"
#include "network/network.h"
#include "basics/basics.h"
#include "config/heron-internals-config-reader.h"
#include "boltimpl/tuple-impl.h"
#include "boltimpl/tick-tuple.h"
namespace heron {
namespace instance {
BoltInstance::BoltInstance(std::shared_ptr<EventLoop> eventLoop,
std::shared_ptr<TaskContextImpl> taskContext,
NotifyingCommunicator<pool_unique_ptr<google::protobuf::Message>>* dataToSlave,
NotifyingCommunicator<google::protobuf::Message*>* dataFromSlave,
void* dllHandle)
: taskContext_(taskContext), dataToSlave_(dataToSlave),
dataFromSlave_(dataFromSlave), eventLoop_(eventLoop), bolt_(NULL), active_(false),
tickTimer_(-1) {
maxWriteBufferSize_ = config::HeronInternalsConfigReader::Instance()
->GetHeronInstanceInternalBoltWriteQueueCapacity();
api::bolt::IBolt* (*creatorFunction)();
creatorFunction = (api::bolt::IBolt* (*)())dlsym(dllHandle,
taskContext->getComponentConstructor().c_str());
if (!creatorFunction) {
LOG(FATAL) << "dlsym failed for " << taskContext->getComponentConstructor()
<< " with error " << dlerror();
}
bolt_ = (*creatorFunction)();
if (!bolt_) {
LOG(FATAL) << "Attempt to create bolt using " << taskContext->getComponentConstructor()
<< " failed with error " << dlerror();
}
serializer_.reset(api::serializer::IPluggableSerializer::createSerializer(
taskContext_->getConfig()));
metrics_.reset(new BoltMetrics(taskContext->getMetricsRegistrar()));
collector_.reset(new BoltOutputCollectorImpl(serializer_, taskContext_,
dataFromSlave_, metrics_));
}
BoltInstance::~BoltInstance() {
bolt_->cleanup();
delete bolt_;
if (tickTimer_ > 0) {
eventLoop_->unRegisterTimer(tickTimer_);
}
}
void BoltInstance::Start() {
CHECK(!active_);
LOG(INFO) << "Starting bolt " << taskContext_->getThisComponentName();
bolt_->open(taskContext_->getConfig(), taskContext_, collector_);
if (taskContext_->getConfig()->hasConfig(api::config::Config::TOPOLOGY_TICK_TUPLE_FREQ_SECS)) {
int tickTimerSecs = atoi(taskContext_->getConfig()
->get(api::config::Config::TOPOLOGY_TICK_TUPLE_FREQ_SECS).c_str());
int64_t timeoutMs = tickTimerSecs * 1000 * 1000;
tickTimer_ = eventLoop_->registerTimer(
[this](EventLoop::Status status) { this->onTickTimer(); }, true, timeoutMs);
CHECK_GT(tickTimer_, 0);
}
active_ = true;
}
void BoltInstance::Activate() {
LOG(INFO) << "Not doing anything in Bolt Activate";
CHECK(!active_);
active_ = true;
}
void BoltInstance::Deactivate() {
LOG(INFO) << "Not doing anything in Bolt Dacctivate";
CHECK(active_);
active_ = false;
}
void BoltInstance::DoWork() {
dataToSlave_->resumeConsumption();
}
void BoltInstance::executeTuple(const proto::api::StreamId& stream,
std::shared_ptr<const proto::system::HeronDataTuple> tup) {
std::shared_ptr<TupleImpl> t(new TupleImpl(serializer_, taskContext_, stream, tup));
int64_t startTime = std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
bolt_->execute(t);
int64_t endTime = std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
metrics_->executeTuple(stream.id(), stream.component_name(), endTime - startTime);
}
void BoltInstance::HandleGatewayTuples(pool_unique_ptr<proto::system::HeronTupleSet2> tupleSet) {
if (tupleSet->has_control()) {
LOG(FATAL) << "Bolt cannot get incoming control tuples from other components";
}
if (tupleSet->has_data()) {
for (int i = 0; i < tupleSet->data().tuples_size(); ++i) {
auto t = new proto::system::HeronDataTuple();
if (!t->ParseFromString(tupleSet->data().tuples(i))) {
LOG(FATAL) << "Failed to parse protobuf";
}
std::shared_ptr<const proto::system::HeronDataTuple> tup(t);
executeTuple(tupleSet->data().stream(), tup);
}
}
if (dataFromSlave_->size() > maxWriteBufferSize_) {
dataToSlave_->stopConsumption();
}
}
void BoltInstance::onTickTimer() {
std::shared_ptr<api::tuple::Tuple> t(new TickTuple(serializer_));
int64_t startTime = std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
bolt_->execute(t);
int64_t endTime = std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
metrics_->executeTuple(t->getSourceStreamId(), t->getSourceComponent(), endTime - startTime);
collector_->sendOutTuples();
}
} // namespace instance
} // namespace heron