blob: b5c32d41fffa7be6ab806019b50193c26b455408 [file]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <atomic>
#include <memory>
#include <string>
#include <utility>
#include <array>
#include "core/logging/LoggerFactory.h"
#include "http/BaseHTTPClient.h"
#include "io/BaseStream.h"
#include "io/NetworkPrioritizer.h"
using namespace std::literals::chrono_literals;
namespace org::apache::nifi::minifi::sitetosite {
inline constexpr std::array<char, 4> MAGIC_BYTES = { 'N', 'i', 'F', 'i' };
class PeerStatus {
public:
PeerStatus(utils::Identifier port_id, std::string host, uint16_t port, uint32_t flow_file_count, bool query_for_peers)
: port_id_(port_id),
host_(std::move(host)),
port_(port),
flow_file_count_(flow_file_count),
query_for_peers_(query_for_peers) {
}
PeerStatus(const PeerStatus &other) = default;
PeerStatus(PeerStatus &&other) = default;
PeerStatus& operator=(const PeerStatus &other) = default;
PeerStatus& operator=(PeerStatus &&other) = default;
const utils::Identifier &getPortId() const {
return port_id_;
}
const std::string &getHost() const {
return host_;
}
[[nodiscard]] uint16_t getPort() const {
return port_;
}
[[nodiscard]] uint32_t getFlowFileCount() const {
return flow_file_count_;
}
[[nodiscard]] bool getQueryForPeers() const {
return query_for_peers_;
}
protected:
utils::Identifier port_id_;
std::string host_;
uint16_t port_;
uint32_t flow_file_count_;
bool query_for_peers_;
};
class SiteToSitePeer : public io::BaseStreamImpl {
public:
SiteToSitePeer(std::unique_ptr<io::BaseStream> injected_socket, const std::string& host, uint16_t port, const std::string& ifc)
: SiteToSitePeer(host, port, ifc) {
stream_ = std::move(injected_socket);
}
SiteToSitePeer(const std::string &host, uint16_t port, const std::string &ifc)
: host_(host),
port_(port),
url_("nifi://" + host_ + ":" + std::to_string(port_)),
local_network_interface_(io::NetworkInterface(ifc, nullptr)) {
}
SiteToSitePeer(SiteToSitePeer &&ss) = delete;
SiteToSitePeer& operator=(SiteToSitePeer&& other) = delete;
SiteToSitePeer(const SiteToSitePeer &parent) = delete;
SiteToSitePeer &operator=(const SiteToSitePeer &parent) = delete;
~SiteToSitePeer() override {
close();
}
[[nodiscard]] std::string getURL() const {
return url_;
}
void setInterface(const std::string &ifc) {
local_network_interface_ = io::NetworkInterface(ifc, nullptr);
}
[[nodiscard]] std::string getInterface() const {
return local_network_interface_.getInterface();
}
void setHostName(const std::string& host) {
host_ = host;
url_ = "nifi://" + host_ + ":" + std::to_string(port_);
}
void setPort(uint16_t port) {
port_ = port;
url_ = "nifi://" + host_ + ":" + std::to_string(port_);
}
[[nodiscard]] std::string getHostName() const {
return host_;
}
[[nodiscard]] uint16_t getPort() const {
return port_;
}
void setTimeout(std::chrono::milliseconds time) {
timeout_ = time;
}
[[nodiscard]] std::chrono::milliseconds getTimeout() const {
return timeout_.load();
}
void setHTTPProxy(const http::HTTPProxy &proxy) {
proxy_ = proxy;
}
[[nodiscard]] http::HTTPProxy getHTTPProxy() const {
return proxy_;
}
void setStream(std::unique_ptr<io::BaseStream> stream) {
stream_ = std::move(stream);
}
[[nodiscard]] io::BaseStream* getStream() const {
return stream_.get();
}
using BaseStream::write;
using BaseStream::read;
size_t write(const uint8_t* data, size_t len) override {
return stream_->write(data, len);
}
size_t read(std::span<std::byte> data) override {
return stream_->read(data);
}
bool open();
void close() override;
private:
std::unique_ptr<io::BaseStream> stream_;
std::string host_;
uint16_t port_;
std::string url_;
std::atomic<std::chrono::milliseconds> timeout_{30s};
io::NetworkInterface local_network_interface_;
http::HTTPProxy proxy_;
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<SiteToSitePeer>::getLogger();
};
} // namespace org::apache::nifi::minifi::sitetosite