blob: b54a3db08bec1731061f8e2470181761570e4a1a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <map>
#include <list>
#include <unordered_set>
#include <string>
#include <utility>
#include "slave/task-context-impl.h"
#include "basics/basics.h"
#include "proto/messages.h"
#include "network/network.h"
#include "threads/threads.h"
namespace heron {
namespace instance {
TaskContextImpl::TaskContextImpl(int myTaskId)
: myTaskId_(myTaskId), pplan_(nullptr), myComponent_(NULL),
mySpout_(NULL), myBolt_(NULL), myInstance_(NULL),
myMergedConfig_(new api::config::Config()) {
}
TaskContextImpl::~TaskContextImpl() {}
void TaskContextImpl::newPhysicalPlan(std::shared_ptr<proto::system::PhysicalPlan> pplan) {
cleanUp();
pplan_ = pplan;
for (int i = 0; i < pplan_->instances_size(); ++i) {
if (pplan_->instances(i).info().task_id() == myTaskId_) {
myInstance_ = pplan_->mutable_instances(i);
break;
}
}
if (!myInstance_) {
LOG(FATAL) << "There was no instance that matched my id " << myTaskId_;
}
myComponentName_ = myInstance_->info().component_name();
myInstanceId_ = myInstance_->instance_id();
for (int i = 0; i < pplan_->topology().spouts_size(); ++i) {
if (pplan_->topology().spouts(i).comp().name() == myComponentName_) {
mySpout_ = pplan_->mutable_topology()->mutable_spouts(i);
myComponent_ = mySpout_->mutable_comp();
break;
}
}
for (int i = 0; i < pplan_->topology().bolts_size(); ++i) {
if (pplan_->topology().bolts(i).comp().name() == myComponentName_) {
myBolt_ = pplan_->mutable_topology()->mutable_bolts(i);
myComponent_ = myBolt_->mutable_comp();
break;
}
}
if (!mySpout_ && !myBolt_) {
LOG(FATAL) << myComponentName_ << " is neither a spout nor a bolt";
}
if (mySpout_ && myBolt_) {
LOG(FATAL) << myComponentName_ << " is bolt a spout and a bolt";
}
if (mySpout_) {
for (auto output : mySpout_->outputs()) {
myOutputSchema_[output.stream().id()] = output.schema().keys_size();
}
} else {
for (auto output : myBolt_->outputs()) {
myOutputSchema_[output.stream().id()] = output.schema().keys_size();
}
}
myMergedConfig_->insert(pplan_->topology().topology_config());
myMergedConfig_->insert(myComponent_->config());
}
void TaskContextImpl::cleanUp() {
if (pplan_) {
pplan_ = nullptr;
}
myOutputSchema_.clear();
taskToComponentName_.clear();
myComponent_ = NULL;
mySpout_ = NULL;
myBolt_ = NULL;
myInstance_ = NULL;
myMergedConfig_->clear();
}
const std::string& TaskContextImpl::getTopologyId() {
return pplan_->topology().id();
}
const std::string& TaskContextImpl::getComponentName(int taskId) {
return config::PhysicalPlanHelper::GetComponentName(*pplan_, taskId);
}
void TaskContextImpl::getComponentStreams(const std::string& componentName,
std::unordered_set<std::string>& retval) {
config::TopologyConfigHelper::GetComponentStreams(pplan_->topology(), componentName, retval);
}
void TaskContextImpl::getComponentTaskIds(const std::string& componentName,
std::list<int>& retval) {
config::PhysicalPlanHelper::GetComponentTaskIds(*pplan_, componentName, retval);
}
api::tuple::Fields TaskContextImpl::getComponentOutputFields(const std::string& componentName,
const std::string& streamId) {
proto::api::StreamSchema* schema =
config::TopologyConfigHelper::GetStreamSchema(*(pplan_->mutable_topology()),
componentName,
streamId);
std::list<std::string> keys;
if (schema) {
for (auto name : schema->keys()) {
keys.push_back(name.key());
}
}
return api::tuple::Fields(keys);
}
void TaskContextImpl::getComponentSources(const std::string& componentName,
std::map<std::pair<std::string, std::string>, proto::api::Grouping>& retval) {
config::TopologyConfigHelper::GetComponentSources(pplan_->topology(), componentName, retval);
}
void TaskContextImpl::getComponentTargets(const std::string& componentName,
std::map<std::string, std::map<std::string, proto::api::Grouping>>& retval) {
config::TopologyConfigHelper::GetComponentTargets(pplan_->topology(), componentName, retval);
}
void TaskContextImpl::getTaskIdToComponentName(std::map<int, std::string>& retval) {
config::PhysicalPlanHelper::GetTaskIdToComponentName(*pplan_, retval);
}
void TaskContextImpl::getAllComponentNames(std::unordered_set<std::string>& retval) {
config::TopologyConfigHelper::GetAllComponentNames(pplan_->topology(), retval);
}
void TaskContextImpl::log(std::ostringstream& ostr) {
LOG(INFO) << ostr.str();
ostr.str(std::string()); // reset it
}
int TaskContextImpl::getThisTaskId() {
return myTaskId_;
}
const std::string& TaskContextImpl::getThisComponentName() {
return myComponent_->name();
}
api::tuple::Fields TaskContextImpl::getThisOutputFields(const std::string& streamId) {
return getComponentOutputFields(getThisComponentName(), streamId);
}
void TaskContextImpl::getThisStreams(std::unordered_set<std::string>& retval) {
return getComponentStreams(getThisComponentName(), retval);
}
int TaskContextImpl::getThisTaskIndex() {
return myInstance_->info().component_index();
}
void TaskContextImpl::getThisSources(std::map<std::pair<std::string, std::string>,
proto::api::Grouping>& retval) {
return getComponentSources(getThisComponentName(), retval);
}
void TaskContextImpl::getThisTargets(std::map<std::string,
std::map<std::string, proto::api::Grouping>>& retval) {
return getComponentTargets(getThisComponentName(), retval);
}
void TaskContextImpl::setTaskData(const std::string& name, const std::string& value) {
myMergedConfig_->insert(name, value);
}
void TaskContextImpl::getTaskData(const std::string& name, std::string& value) {
if (myMergedConfig_->hasConfig(name)) {
value = myMergedConfig_->get(name);
}
}
} // namespace instance
} // namespace heron