blob: 3d253951b85585b6c5617547d065eb0baf3a1436 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
#include <set>
#include <string>
#include <vector>
#include "network/network_error.h"
#include "proto/messages.h"
#include "network/network.h"
#include "basics/basics.h"
namespace heron {
namespace stmgr {
using std::shared_ptr;
class TMasterClient : public Client {
TMasterClient(shared_ptr<EventLoop> eventLoop, const NetworkOptions& _options,
const sp_string& _stmgr_id,
const sp_string& _stmgr_host, sp_int32 _data_port, sp_int32 _local_data_port,
sp_int32 _shell_port,
VCallback<shared_ptr<proto::system::PhysicalPlan>> _pplan_watch,
VCallback<sp_string> _stateful_checkpoint_watch,
VCallback<sp_string, sp_int64> _restore_topology_watch,
VCallback<sp_string> _start_stateful_watch);
virtual ~TMasterClient();
// Told by the upper layer to disconnect and self destruct
void Die();
// Sets the instances that belong to us
void SetInstanceInfo(const std::vector<proto::system::Instance*>& _instances);
// returns the tmaster address "host:port" form.
sp_string getTmasterHostPort();
// Send a InstanceStateStored message to tmaster
void SavedInstanceState(const proto::system::Instance& _instance,
const std::string& _checkpoint_id);
// Send RestoreTopologyStateResponse to tmaster
void SendRestoreTopologyStateResponse(proto::system::StatusCode _status,
const std::string& _checkpoint_id,
sp_int64 _txid);
// Send ResetTopologyState message to tmaster
void SendResetTopologyState(const std::string& _dead_stmgr,
int32_t _dead_instance,
const std::string& _reason);
virtual void HandleConnect(NetworkErrorCode status);
virtual void HandleClose(NetworkErrorCode status);
void HandleRegisterResponse(void*, unique_ptr<proto::tmaster::StMgrRegisterResponse> _response,
void HandleHeartbeatResponse(void*, unique_ptr<proto::tmaster::StMgrHeartbeatResponse> response,
void HandleNewAssignmentMessage(unique_ptr<proto::stmgr::NewPhysicalPlanMessage> _message);
void HandleStatefulCheckpointMessage(
unique_ptr<proto::ckptmgr::StartStatefulCheckpoint> _message);
void HandleRestoreTopologyStateRequest(
unique_ptr<proto::ckptmgr::RestoreTopologyStateRequest> _message);
void HandleStartStmgrStatefulProcessing(
unique_ptr<proto::ckptmgr::StartStmgrStatefulProcessing> _msg);
void OnReConnectTimer();
void OnHeartbeatTimer();
void SendRegisterRequest();
void SendHeartbeatRequest();
void CleanInstances();
sp_string stmgr_id_;
sp_string stmgr_host_;
sp_int32 data_port_;
sp_int32 local_data_port_;
sp_int32 shell_port_;
// Set of instances to be reported to tmaster
std::set<unique_ptr<proto::system::Instance>> instances_;
bool to_die_;
// We invoke this callback upon a new physical plan from tmaster
VCallback<shared_ptr<proto::system::PhysicalPlan>> pplan_watch_;
// We invoke this callback upon receiving a checkpoint message from tmaster
// passing in the checkpoint id
VCallback<sp_string> stateful_checkpoint_watch_;
// We invoke this callback upon receiving a restore topology message from tmaster
// passing in the checkpoint id and the txid
VCallback<sp_string, sp_int64> restore_topology_watch_;
// We invoke this callback upon receiving a StartStatefulProcessing message from tmaster
// passing in the checkpoint id
VCallback<sp_string> start_stateful_watch_;
// Configs to be read
sp_int32 reconnect_tmaster_interval_sec_;
sp_int32 stream_to_tmaster_heartbeat_interval_sec_;
sp_int64 reconnect_timer_id;
sp_int64 heartbeat_timer_id;
// Counter for reconnect attempts
sp_int32 reconnect_attempts_;
sp_int32 reconnect_max_attempt_;
// Permanent timer callbacks
VCallback<> reconnect_timer_cb;
VCallback<> heartbeat_timer_cb;
} // namespace stmgr
} // namespace heron