/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

#include "manager/stmgr-clientmgr.h"
#include <iostream>
#include <map>
#include <unordered_set>
#include "manager/stmgr.h"
#include "manager/stmgr-client.h"
#include "proto/messages.h"
#include "basics/basics.h"
#include "errors/errors.h"
#include "threads/threads.h"
#include "network/network.h"
#include "config/heron-internals-config-reader.h"
#include "metrics/metrics.h"

namespace heron {
namespace stmgr {

using std::make_shared;

// New connections made with other stream managers.
const sp_string METRIC_STMGR_NEW_CONNECTIONS = "__stmgr_new_connections";

StMgrClientMgr::StMgrClientMgr(shared_ptr<EventLoop> eventLoop, const sp_string& _topology_name,
                             const sp_string& _topology_id, const sp_string& _stmgr_id,
                             StMgr* _stream_manager,
                             shared_ptr<heron::common::MetricsMgrSt> const& _metrics_manager_client,
                             sp_int64 _high_watermark, sp_int64 _low_watermark,
                             bool _droptuples_upon_backpressure)
    : topology_name_(_topology_name),
      topology_id_(_topology_id),
      stmgr_id_(_stmgr_id),
      eventLoop_(eventLoop),
      stream_manager_(_stream_manager),
      metrics_manager_client_(_metrics_manager_client),
      high_watermark_(_high_watermark),
      low_watermark_(_low_watermark),
      droptuples_upon_backpressure_(_droptuples_upon_backpressure) {
  stmgr_clientmgr_metrics_ = make_shared<heron::common::MultiCountMetric>();
  metrics_manager_client_->register_metric("__clientmgr", stmgr_clientmgr_metrics_);
}

StMgrClientMgr::~StMgrClientMgr() {
  // This should not be called
  metrics_manager_client_->unregister_metric("__clientmgr");
  clients_.clear();
}

void StMgrClientMgr::StartConnections(proto::system::PhysicalPlan const& _pplan) {
  // TODO(vikasr) : Currently we establish connections with all streammanagers
  // In the next iteration we might want to make it better
  std::unordered_set<sp_string> all_stmgrs;
  for (sp_int32 i = 0; i < _pplan.stmgrs_size(); ++i) {
    const proto::system::StMgr& s = _pplan.stmgrs(i);
    if (s.id() == stmgr_id_) {
      continue;  // dont want to connect to ourselves
    }
    all_stmgrs.insert(s.id());
    if (clients_.find(s.id()) != clients_.end()) {
      // We already have a connection for this stmgr.
      // Just make sure we have it for the same host/port
      const NetworkOptions& o = clients_[s.id()]->get_clientoptions();
      if (o.get_host() != s.host_name() || o.get_port() != s.data_port()) {
        LOG(INFO) << "Stmgr " << s.id() << " changed from " << o.get_host() << ":" << o.get_port()
                  << " to " << s.host_name() << ":" << s.data_port();
        // This stmgr has actually moved to a different host/port
        clients_[s.id()]->Quit();  // this will delete itself.
        clients_[s.id()] = CreateClient(s.id(), s.host_name(), s.data_port());
      } else {
        // This stmgr has remained the same. Don't do anything
      }
    } else {
      // We don't have any connection to this stmgr.
      LOG(INFO) << "Stmgr " << s.id() << " came on " << s.host_name() << ":" << s.data_port();
      clients_[s.id()] = CreateClient(s.id(), s.host_name(), s.data_port());
    }
  }

  // We need to remove any unused ports
  std::unordered_set<sp_string> to_remove;
  for (auto iter = clients_.begin(); iter != clients_.end(); ++iter) {
    if (all_stmgrs.find(iter->first) == all_stmgrs.end()) {
      // This stmgr is no longer there in the physical map
      to_remove.insert(iter->first);
    }
  }

  // Now go over to_remove to remove all the unused stmgrs
  for (auto iter = to_remove.begin(); iter != to_remove.end(); ++iter) {
    LOG(INFO) << "Stmgr " << *iter << " no longer required";
    clients_[*iter]->Quit();  // This will delete itself.
    clients_.erase(*iter);
  }
}

bool StMgrClientMgr::DidAnnounceBackPressure() {
  return stream_manager_->DidAnnounceBackPressure();
}

shared_ptr<StMgrClient> StMgrClientMgr::CreateClient(const sp_string& _other_stmgr_id,
                                          const sp_string& _hostname, sp_int32 _port) {
  stmgr_clientmgr_metrics_->scope(METRIC_STMGR_NEW_CONNECTIONS)->incr();
  NetworkOptions options;
  options.set_host(_hostname);
  options.set_port(_port);
  options.set_max_packet_size(config::HeronInternalsConfigReader::Instance()
                                  ->GetHeronStreammgrNetworkOptionsMaximumPacketMb() * 1_MB);
  options.set_high_watermark(high_watermark_);
  options.set_low_watermark(low_watermark_);
  options.set_socket_family(PF_INET);
  auto client = make_shared<StMgrClient>(eventLoop_, options, topology_name_, topology_id_,
                                        stmgr_id_, _other_stmgr_id, this, metrics_manager_client_,
                                        droptuples_upon_backpressure_);
  client->Start();
  return client;
}

bool StMgrClientMgr::SendTupleStreamMessage(sp_int32 _task_id, const sp_string& _stmgr_id,
                                            const proto::system::HeronTupleSet2& _msg) {
  auto iter = clients_.find(_stmgr_id);
  CHECK(iter != clients_.end());

  // Acquire the message
  proto::stmgr::TupleStreamMessage* out = nullptr;
  out = __global_protobuf_pool_acquire__(out);
  out->set_task_id(_task_id);
  out->set_src_task_id(_msg.src_task_id());
  sp_int32 length = 0;
  if (_msg.has_data()) {
    length = _msg.data().tuples_size();
  }
  out->set_num_tuples(length);
  _msg.SerializePartialToString(out->mutable_set());

  bool retval = clients_[_stmgr_id]->SendTupleStreamMessage(*out);

  // Release the message
  __global_protobuf_pool_release__(out);

  return retval;
}

void StMgrClientMgr::SendDownstreamStatefulCheckpoint(const sp_string& _stmgr_id,
                           proto::ckptmgr::DownstreamStatefulCheckpoint* _message) {
  auto iter = clients_.find(_stmgr_id);
  CHECK(iter != clients_.end());
  iter->second->SendDownstreamStatefulCheckpoint(_message);
}

void StMgrClientMgr::StartBackPressureOnServer(const sp_string& _other_stmgr_id) {
  stream_manager_->StartBackPressureOnServer(_other_stmgr_id);
}

void StMgrClientMgr::StopBackPressureOnServer(const sp_string& _other_stmgr_id) {
  // Call the StMgrServers removeBackPressure method
  stream_manager_->StopBackPressureOnServer(_other_stmgr_id);
}

void StMgrClientMgr::SendStartBackPressureToOtherStMgrs() {
  for (auto iter = clients_.begin(); iter != clients_.end(); ++iter) {
    iter->second->SendStartBackPressureMessage();
  }
}

void StMgrClientMgr::SendStopBackPressureToOtherStMgrs() {
  for (auto iter = clients_.begin(); iter != clients_.end(); ++iter) {
    iter->second->SendStopBackPressureMessage();
  }
}

void StMgrClientMgr::HandleDeadStMgrConnection(const sp_string& _dead_stmgr) {
  stream_manager_->HandleDeadStMgrConnection(_dead_stmgr);
}

void StMgrClientMgr::HandleStMgrClientRegistered() {
  if (AllStMgrClientsRegistered()) {
    stream_manager_->HandleAllStMgrClientsRegistered();
  }
}

void StMgrClientMgr::CloseConnectionsAndClear() {
  for (auto kv : clients_) {
    kv.second->Quit();  // It will delete itself
  }
  clients_.clear();
}

bool StMgrClientMgr::AllStMgrClientsRegistered() {
  for (auto kv : clients_) {
    if (!kv.second->IsConnected()) {
      return false;
    }
    if (!kv.second->IsRegistered()) {
      return false;
    }
  }
  return true;
}
}  // namespace stmgr
}  // namespace heron
