blob: 3f60b50f45ea1ccf7409166fe4c962aaf403dafa [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef HERON_API_TOPOLOGY_TOPOLOGY_BUILDER_H_
#define HERON_API_TOPOLOGY_TOPOLOGY_BUILDER_H_
#include <stdlib.h>
#include <time.h>
#include <map>
#include <string>
#include "proto/messages.h"
#include "bolt/irich-bolt.h"
#include "spout/irich-spout.h"
#include "config/config.h"
#include "topology/bolt-declarer.h"
#include "topology/spout-declarer.h"
namespace heron {
namespace api {
namespace topology {
/**
* TopologyBuilder exposes the C++ API for specifying a topology for Heron
* to execute. TopologyBuilder greatly eases the process of creating topologies.
* The template for creating and submitting a topology looks something like:
* <p>
* <pre>
* auto builder = std::make_shared<TopologyBuilder>();
*
* builder->setSpout("1", std::make_shared<TestWordSpout>(true), "createTestWordSpout", 5);
* builder->setSpout("2", std::make_shared<TestWordSpout>(true), "createTestWordSpout", 3);
* builder->setBolt("3", std::make_shared<TestWordCounter>(), "createTestWordCounter", 3)
* ->fieldsGrouping("1", std::make_shared<Fields>("word"))
* ->fieldsGrouping("2", std::make_shared<Fields>("word"));
* builder->setBolt("4", std::make_shared<TestGlobalCount>(), "createTestGlobalCount")
* ->globalGrouping("1");
*
* auto conf = std::make_shared<Config>();
* conf->setStMgrs(4);
*
* HeronSubmitter::submitTopology(builder->createTopology("mytopology", conf));
* <p>The pattern for TopologyBuilder is to map component ids to components using the setSpout
* and setBolt methods. Those methods return objects that are then used to declare
* the inputs for that component.</p>
*/
class TopologyBuilder {
public:
TopologyBuilder() : topology_(nullptr) { }
~TopologyBuilder() { delete topology_; }
const proto::api::Topology& createTopology(const std::string& name,
std::shared_ptr<config::Config> userConfig) {
if (name.empty()) {
throw std::invalid_argument("Failed to build topology; missing necessary info");
}
topology_ = new proto::api::Topology();
srand(time(nullptr));
std::ostringstream ss;
ss << rand();
std::string topologyId = name + ss.str();
topology_->set_name(name);
topology_->set_id(topologyId);
topology_->set_state(proto::api::TopologyState::RUNNING);
addDefaultTopologyConfig(userConfig);
userConfig->setTopologyName(name);
userConfig->dump(topology_->mutable_topology_config());
for (auto& kv : spouts_) {
kv.second->dump(topology_);
}
for (auto& kv : bolts_) {
kv.second->dump(topology_);
}
return *topology_;
}
/**
* Define a new bolt in this topology with parallelism of one.
*
* @param name the name of this component. This id is referenced by other components
* that want to consume this bolt's outputs.
* @param bolt the bolt
* @param constructor the extern "C" style function that can be used to create the bolt
* @return use the returned object to declare the inputs to this component
*/
std::shared_ptr<topology::BoltDeclarer> setBolt(const std::string& name,
std::shared_ptr<bolt::IRichBolt> bolt,
const std::string& constructor) {
return setBolt(name, bolt, constructor, 1);
}
/**
* Define a new bolt in this topology with the specified amount of parallelism.
*
* @param name the name of this component. This id is referenced by other components
* that want to consume this bolt's outputs.
* @param bolt the bolt
* @param constructor the extern "C" style function that can be used to create the bolt
* @param parallelismHint the number of tasks that should be assigned to execute this bolt.
* Each task will run in a seperate process somewhere around the cluster.
* @return use the returned object to declare the inputs to this component
*/
std::shared_ptr<topology::BoltDeclarer> setBolt(const std::string& name,
std::shared_ptr<bolt::IRichBolt> bolt,
const std::string& constructor,
int parallelism) {
validateComponentName(name);
auto b = std::make_shared<topology::BoltDeclarer>(name, constructor, bolt, parallelism);
bolts_[name] = b;
return b;
}
/**
* Define a new spout in this topology.
*
* @param name the name of this component. This id is referenced by other components
* that want to consume this spout's outputs.
* @param spout the spout
* @param constructor the extern "C" style function that can be used to create the spout
*/
std::shared_ptr<SpoutDeclarer> setSpout(const std::string& name,
std::shared_ptr<spout::IRichSpout> spout,
const std::string& constructor) {
return setSpout(name, spout, constructor, 1);
}
/**
* Define a new spout in this topology.
*
* @param name the name of this component. This id is referenced by other components
* that want to consume this spout's outputs.
* @param spout the spout
* @param constructor the extern "C" style function that can be used to create the spout
* @param parallelismHint the number of tasks that should be assigned to execute this spout.
* Each task will run in a seperate process somewhere around the cluster
*/
std::shared_ptr<SpoutDeclarer> setSpout(const std::string& name,
std::shared_ptr<spout::IRichSpout> spout,
const std::string& constructor,
int parallelismHint) {
validateComponentName(name);
auto b = std::make_shared<topology::SpoutDeclarer>(name, constructor,
spout, parallelismHint);
spouts_[name] = b;
return b;
}
private:
void validateComponentName(const std::string& componentName) {
if (componentName.find(',') != std::string::npos) {
throw std::invalid_argument("Component name should not contain comma(,)");
}
if (componentName.find(':') != std::string::npos) {
throw std::invalid_argument("Component name should not contain colon(:)");
}
validateUnusedName(componentName);
}
void validateUnusedName(const std::string& componentName) {
if (spouts_.find(componentName) != spouts_.end()) {
throw std::invalid_argument("A Spout has already been declared in the same name");
}
if (bolts_.find(componentName) != bolts_.end()) {
throw std::invalid_argument("A Bolt has already been declared in the same name");
}
}
void addDefaultTopologyConfig(std::shared_ptr<config::Config> userConfig) {
if (!userConfig->hasConfig(config::Config::TOPOLOGY_DEBUG)) {
userConfig->setDebug(false);
}
if (!userConfig->hasConfig(config::Config::TOPOLOGY_STMGRS)) {
userConfig->setNumStmgrs(1);
}
if (!userConfig->hasConfig(config::Config::TOPOLOGY_MESSAGE_TIMEOUT_SECS)) {
userConfig->setMessageTimeoutSecs(30);
}
if (!userConfig->hasConfig(config::Config::TOPOLOGY_COMPONENT_PARALLELISM)) {
userConfig->setComponentParallelism(1);
}
if (!userConfig->hasConfig(config::Config::TOPOLOGY_MAX_SPOUT_PENDING)) {
userConfig->setMaxSpoutPending(100);
}
if (!userConfig->hasConfig(config::Config::TOPOLOGY_RELIABILITY_MODE)) {
userConfig->setTopologyReliabilityMode(config::Config::ATMOST_ONCE);
}
if (!userConfig->hasConfig(config::Config::TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS)) {
userConfig->setEnableMessageTimeouts(true);
}
}
std::map<std::string, std::shared_ptr<BoltDeclarer>> bolts_;
std::map<std::string, std::shared_ptr<SpoutDeclarer>> spouts_;
proto::api::Topology* topology_;
};
} // namespace topology
} // namespace api
} // namespace heron
#endif // HERON_API_TOPOLOGY_TOPOLOGY_BUILDER_H_