blob: 5f94fa4e04e4a5f1723f4795545ae2caeeba101d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <iostream>
#include <vector>
#include "proto/messages.h"
#include "basics/basics.h"
#include "errors/errors.h"
#include "threads/threads.h"
#include "network/network.h"
#include "server/dummy_stmgr.h"
using std::shared_ptr;
///////////////////////////// DummyTMasterClient ///////////////////////////////////////////
DummyTMasterClient::DummyTMasterClient(
shared_ptr<EventLoopImpl> eventLoop, const NetworkOptions& _options, const sp_string& stmgr_id,
const sp_string& stmgr_host, sp_int32 stmgr_port, sp_int32 shell_port,
const std::vector<shared_ptr<heron::proto::system::Instance>>& _instances)
: Client(eventLoop, _options),
stmgr_id_(stmgr_id),
stmgr_host_(stmgr_host),
stmgr_port_(stmgr_port),
shell_port_(shell_port),
instances_(_instances) {
InstallResponseHandler(make_unique<heron::proto::tmaster::StMgrRegisterRequest>(),
&DummyTMasterClient::HandleRegisterResponse);
// Setup the call back function to be invoked when retrying
retry_cb_ = [this]() { this->Retry(); };
}
DummyTMasterClient::~DummyTMasterClient() {}
void DummyTMasterClient::HandleRegisterResponse(
void*,
pool_unique_ptr<heron::proto::tmaster::StMgrRegisterResponse> response,
NetworkErrorCode) {
}
void DummyTMasterClient::HandleConnect(NetworkErrorCode _status) {
if (_status == OK) {
CreateAndSendRegisterRequest();
} else {
// Retry after some time
AddTimer(retry_cb_, 100);
}
}
void DummyTMasterClient::HandleClose(NetworkErrorCode) {}
void DummyTMasterClient::CreateAndSendRegisterRequest() {
auto request = make_unique<heron::proto::tmaster::StMgrRegisterRequest>();
heron::proto::system::StMgr* stmgr = request->mutable_stmgr();
sp_string cwd;
stmgr->set_id(stmgr_id_);
stmgr->set_host_name(stmgr_host_);
stmgr->set_data_port(stmgr_port_);
stmgr->set_local_endpoint("/unused");
stmgr->set_cwd(cwd);
stmgr->set_pid((sp_int32)ProcessUtils::getPid());
stmgr->set_shell_port(shell_port_);
for (auto iter = instances_.begin(); iter != instances_.end(); ++iter) {
request->add_instances()->CopyFrom(**iter);
}
SendRequest(std::move(request), nullptr);
}
///////////////////////////// DummyStMgr /////////////////////////////////////////////////
DummyStMgr::DummyStMgr(shared_ptr<EventLoopImpl> ss, const NetworkOptions& options,
const sp_string& stmgr_id,
const sp_string& stmgr_host, sp_int32 stmgr_port,
const sp_string& tmaster_host, sp_int32 tmaster_port, sp_int32 shell_port,
const std::vector<shared_ptr<heron::proto::system::Instance>>& _instances)
: Server(ss, options), num_start_bp_(0), num_stop_bp_(0) {
NetworkOptions tmaster_options;
tmaster_options.set_host(tmaster_host);
tmaster_options.set_port(tmaster_port);
tmaster_options.set_max_packet_size(1_MB);
tmaster_options.set_socket_family(PF_INET);
tmaster_client_ = new DummyTMasterClient(ss, tmaster_options, stmgr_id, stmgr_host, stmgr_port,
shell_port, _instances);
InstallRequestHandler(&DummyStMgr::HandleStMgrHelloRequest);
InstallMessageHandler(&DummyStMgr::HandleStartBackPressureMessage);
InstallMessageHandler(&DummyStMgr::HandleStopBackPressureMessage);
}
DummyStMgr::~DummyStMgr() {
tmaster_client_->Stop();
delete tmaster_client_;
}
sp_int32 DummyStMgr::Start() {
if (SP_OK == Server::Start()) {
tmaster_client_->setStmgrPort(get_serveroptions().get_port());
tmaster_client_->Start();
return SP_OK;
} else {
return SP_NOTOK;
}
}
void DummyStMgr::HandleNewConnection(Connection* conn) {}
void DummyStMgr::HandleConnectionClose(Connection*, NetworkErrorCode) {}
void DummyStMgr::HandleStMgrHelloRequest(REQID _id, Connection* _conn,
pool_unique_ptr<heron::proto::stmgr::StrMgrHelloRequest> _request) {
other_stmgrs_ids_.push_back(_request->stmgr());
heron::proto::stmgr::StrMgrHelloResponse response;
response.mutable_status()->set_status(heron::proto::system::OK);
SendResponse(_id, _conn, response);
}
void DummyStMgr::HandleStartBackPressureMessage(Connection*,
pool_unique_ptr<heron::proto::stmgr::StartBackPressureMessage>) {
++num_start_bp_;
}
void DummyStMgr::HandleStopBackPressureMessage(Connection*,
pool_unique_ptr<heron::proto::stmgr::StopBackPressureMessage>) {
++num_stop_bp_;
}