blob: 3a5a75f8ad1eff0df2001a8fd25a6b1dd36917c1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef SRC_CPP_SVCS_STMGR_SRC_MANAGER_STMGR_CLIENTMGR_H_
#define SRC_CPP_SVCS_STMGR_SRC_MANAGER_STMGR_CLIENTMGR_H_
#include <unordered_map>
#include "proto/messages.h"
#include "network/network.h"
#include "basics/basics.h"
namespace heron {
namespace common {
class MetricsMgrSt;
class MultiCountMetric;
}
}
namespace heron {
namespace stmgr {
using std::shared_ptr;
class StMgr;
class StMgrClient;
class StMgrClientMgr {
public:
StMgrClientMgr(shared_ptr<EventLoop> eventLoop, const sp_string& _topology_name,
const sp_string& _topology_id, const sp_string& _stmgr_id, StMgr* _stream_manager,
shared_ptr<heron::common::MetricsMgrSt> const& _metrics_manager_client,
sp_int64 _high_watermark, sp_int64 _low_watermark,
bool _droptuples_upon_backpressure);
virtual ~StMgrClientMgr();
// Start the appropriate clients based on a new physical plan
virtual void StartConnections(proto::system::PhysicalPlan const& _pplan);
// return true if we are successful in sending the message. false otherwise
bool SendTupleStreamMessage(sp_int32 _task_id,
const sp_string& _stmgr_id,
const proto::system::HeronTupleSet2& _msg);
// Forward the call to the stmgr
void StartBackPressureOnServer(const sp_string& _other_stmgr_id);
// Forward the call to the stmgr
void StopBackPressureOnServer(const sp_string& _other_stmgr_id);
// Used by the server to tell the client to send the back pressure related
// messages
void SendStartBackPressureToOtherStMgrs();
void SendStopBackPressureToOtherStMgrs();
bool DidAnnounceBackPressure();
// Called by StMgrClient when its connection closes
void HandleDeadStMgrConnection(const sp_string& _stmgr_id);
// Called by StMgrClient when it successfully registers
void HandleStMgrClientRegistered();
void SendDownstreamStatefulCheckpoint(const sp_string& _stmgr_id,
proto::ckptmgr::DownstreamStatefulCheckpoint* _message);
// Interface to close all connections
virtual void CloseConnectionsAndClear();
// Check if all clients are registered
virtual bool AllStMgrClientsRegistered();
private:
shared_ptr<StMgrClient> CreateClient(const sp_string& _other_stmgr_id,
const sp_string& _host_name, sp_int32 _port);
// map of stmgrid to its client
std::unordered_map<sp_string, shared_ptr<StMgrClient>> clients_;
sp_string topology_name_;
sp_string topology_id_;
sp_string stmgr_id_;
shared_ptr<EventLoop> eventLoop_;
StMgr* stream_manager_;
// Metrics
shared_ptr<heron::common::MetricsMgrSt> metrics_manager_client_;
shared_ptr<heron::common::MultiCountMetric> stmgr_clientmgr_metrics_;
sp_int64 high_watermark_;
sp_int64 low_watermark_;
bool droptuples_upon_backpressure_;
};
} // namespace stmgr
} // namespace heron
#endif // SRC_CPP_SVCS_STMGR_SRC_MANAGER_STMGR_CLIENTMGR_H_