blob: ad35f069577532812c4c39acdfa51ae6b949789b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <iostream>
#include <string>
#include <vector>
#include "gflags/gflags.h"
#include "manager/stmgr.h"
#include "statemgr/heron-statemgr.h"
#include "proto/messages.h"
#include "basics/basics.h"
#include "errors/errors.h"
#include "threads/threads.h"
#include "network/network.h"
#include "config/heron-internals-config-reader.h"
DEFINE_string(topology_name, "", "Name of the topology");
DEFINE_string(topology_id, "", "Id of the topology");
DEFINE_string(topologydefn_file, "", "Name of the topology defn file");
DEFINE_string(zkhostportlist, "", "Location of the zk");
DEFINE_string(zkroot, "", "Root of the zk");
DEFINE_string(stmgr_id, "", "My Id");
DEFINE_string(instance_ids, "", "Comma seperated list of instance ids in my container");
DEFINE_string(myhost, "", "The hostname that I'm running");
DEFINE_int32(data_port, 0, "The port used for inter-container traffic");
DEFINE_int32(local_data_port, 0, "The port used for intra-container traffic");
DEFINE_int32(metricsmgr_port, 0, "The port of the local metricsmgr");
DEFINE_int32(shell_port, 0, "The port of the local heron shell");
DEFINE_string(config_file, "", "The heron internals config file");
DEFINE_string(override_config_file, "", "The override heron internals config file");
DEFINE_string(ckptmgr_id, "", "The id of the local ckptmgr");
DEFINE_int32(ckptmgr_port, 0, "The port of the local ckptmgr");
DEFINE_string(metricscachemgr_mode, "disabled", "MetricsCacheMgr mode, default `disabled`");
int main(int argc, char* argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
if (FLAGS_zkhostportlist == "LOCALMODE") {
FLAGS_zkhostportlist = "";
}
std::vector<std::string> instances = StrUtils::split(FLAGS_instance_ids, ",");
auto ss = std::make_shared<EventLoopImpl>();
// Read heron internals config from local file
// Create the heron-internals-config-reader to read the heron internals config
heron::config::HeronInternalsConfigReader::Create(ss,
FLAGS_config_file, FLAGS_override_config_file);
heron::common::Initialize(argv[0], FLAGS_stmgr_id.c_str());
// Lets first read the top defn file
auto topology = std::make_shared<heron::proto::api::Topology>();
sp_string contents = FileUtils::readAll(FLAGS_topologydefn_file);
topology->ParseFromString(contents);
if (!topology->IsInitialized()) {
LOG(FATAL) << "Corrupt topology defn file " << FLAGS_topologydefn_file;
}
sp_int64 high_watermark = heron::config::HeronInternalsConfigReader::Instance()
->GetHeronStreammgrNetworkBackpressureHighwatermarkMb() *
1_MB;
sp_int64 low_watermark = heron::config::HeronInternalsConfigReader::Instance()
->GetHeronStreammgrNetworkBackpressureLowwatermarkMb() *
1_MB;
heron::stmgr::StMgr mgr(ss, FLAGS_myhost, FLAGS_data_port, FLAGS_local_data_port,
FLAGS_topology_name, FLAGS_topology_id, topology, FLAGS_stmgr_id,
instances, FLAGS_zkhostportlist, FLAGS_zkroot, FLAGS_metricsmgr_port,
FLAGS_shell_port, FLAGS_ckptmgr_port, FLAGS_ckptmgr_id,
high_watermark, low_watermark, FLAGS_metricscachemgr_mode);
mgr.Init();
ss->loop();
return 0;
}