blob: 656f71e65470f6ee5747076f16b1aca7de94c29b [file]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <algorithm>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include <optional>
#include <expected>
#include "Peer.h"
#include "SiteToSite.h"
#include "core/ProcessSession.h"
#include "minifi-cpp/core/ProcessContext.h"
namespace org::apache::nifi::minifi {
namespace test {
class SiteToSiteClientTestAccessor;
} // namespace test
namespace sitetosite {
struct DataPacket {
public:
DataPacket(std::shared_ptr<Transaction> transaction, const std::string &payload)
: transaction{std::move(transaction)},
payload{payload} {
}
DataPacket(std::shared_ptr<Transaction> transaction, std::map<std::string, std::string> attributes, const std::string &payload)
: attributes{std::move(attributes)},
transaction{std::move(transaction)},
payload{payload} {
}
std::map<std::string, std::string> attributes;
std::shared_ptr<Transaction> transaction;
const std::string& payload;
};
struct SiteToSiteResponse {
ResponseCode code = ResponseCode::UNRECOGNIZED_RESPONSE_CODE;
std::string message;
};
class SiteToSiteClient {
public:
explicit SiteToSiteClient(gsl::not_null<std::unique_ptr<SiteToSitePeer>> peer)
: peer_(std::move(peer)) {
}
SiteToSiteClient(const SiteToSiteClient&) = delete;
SiteToSiteClient(SiteToSiteClient&&) = delete;
SiteToSiteClient& operator=(const SiteToSiteClient&) = delete;
SiteToSiteClient& operator=(SiteToSiteClient&&) = delete;
virtual ~SiteToSiteClient() = default;
virtual std::optional<std::vector<PeerStatus>> getPeerList() = 0;
virtual bool transmitPayload(core::ProcessContext& context, const std::string &payload, const std::map<std::string, std::string>& attributes) = 0;
bool transfer(TransferDirection direction, core::ProcessContext& context, core::ProcessSession& session) {
if (direction == TransferDirection::SEND) {
return transferFlowFiles(context, session);
} else {
return receiveFlowFiles(context, session);
}
}
void setPortId(const utils::Identifier& id) {
port_id_ = id;
}
void setIdleTimeout(std::chrono::milliseconds timeout) {
idle_timeout_ = timeout;
}
[[nodiscard]] utils::Identifier getPortId() const {
return port_id_;
}
[[nodiscard]] const std::shared_ptr<core::logging::Logger> &getLogger() {
return logger_;
}
void setSSLContextService(const std::shared_ptr<minifi::controllers::SSLContextServiceInterface> &context_service) {
ssl_context_service_ = context_service;
}
void setUseCompression(bool use_compression) {
use_compression_ = use_compression;
}
void setBatchSize(uint64_t size) {
batch_size_ = size;
}
void setBatchCount(uint64_t count) {
batch_count_ = count;
}
void setBatchDuration(std::chrono::milliseconds duration) {
batch_duration_ = duration;
}
virtual void setTimeout(std::chrono::milliseconds timeout) {
timeout_ = timeout;
}
protected:
friend class test::SiteToSiteClientTestAccessor;
virtual bool bootstrap() = 0;
virtual bool establish() = 0;
virtual std::shared_ptr<Transaction> createTransaction(TransferDirection direction) = 0;
virtual void tearDown() = 0;
virtual void deleteTransaction(const utils::Identifier &transaction_id);
virtual std::optional<SiteToSiteResponse> readResponse(const std::shared_ptr<Transaction> &transaction);
virtual bool writeResponse(const std::shared_ptr<Transaction> &transaction, const SiteToSiteResponse& response);
virtual std::pair<uint64_t, uint64_t> readFlowFiles(const std::shared_ptr<Transaction>& transaction, core::ProcessSession& session);
bool initializeSend(const std::shared_ptr<Transaction>& transaction);
bool writeAttributesInSendTransaction(io::OutputStream& stream, const std::string& transaction_id_str, const std::map<std::string, std::string>& attributes);
void finalizeSendTransaction(const std::shared_ptr<Transaction>& transaction, uint64_t sent_bytes);
bool sendPacket(const DataPacket& packet);
bool sendFlowFile(const std::shared_ptr<Transaction>& transaction, core::FlowFile& flow_file, core::ProcessSession& session);
void cancel(const utils::Identifier &transaction_id);
bool complete(core::ProcessContext& context, const utils::Identifier &transaction_id);
void error(const utils::Identifier &transaction_id);
bool confirm(const utils::Identifier &transaction_id);
void handleTransactionError(const std::shared_ptr<Transaction>& transaction, core::ProcessContext& context, const std::exception& exception);
PeerState peer_state_{PeerState::IDLE};
utils::Identifier port_id_;
std::chrono::milliseconds idle_timeout_{15s};
gsl::not_null<std::unique_ptr<SiteToSitePeer>> peer_;
std::map<utils::Identifier, std::shared_ptr<Transaction>> known_transactions_;
std::chrono::nanoseconds batch_send_nanos_{5s};
const std::vector<uint32_t> supported_versions_ = {5, 4, 3, 2, 1};
uint32_t current_version_index_{0};
uint32_t current_version_{supported_versions_[current_version_index_]};
const std::vector<uint32_t> supported_codec_versions_ = {1};
uint32_t current_codec_version_index_{0};
uint32_t current_codec_version_{supported_codec_versions_[current_codec_version_index_]};
std::shared_ptr<minifi::controllers::SSLContextServiceInterface> ssl_context_service_;
std::atomic_bool use_compression_{false};
std::atomic<uint64_t> batch_count_{0};
std::atomic<uint64_t> batch_size_{0};
std::atomic<std::chrono::milliseconds> batch_duration_{0s};
std::atomic<std::chrono::milliseconds> timeout_{0s};
private:
struct ReceiveFlowFileHeaderResult {
std::map<std::string, std::string> attributes;
size_t flow_file_data_size = 0;
bool eof{false};
};
static const ResponseCodeContext* getResponseCodeContext(ResponseCode code);
bool transferFlowFiles(core::ProcessContext& context, core::ProcessSession& session);
bool receiveFlowFiles(core::ProcessContext& context, core::ProcessSession& session);
bool confirmReceive(const std::shared_ptr<Transaction>& transaction, const utils::Identifier& transaction_id);
bool confirmSend(const std::shared_ptr<Transaction>& transaction, const utils::Identifier& transaction_id);
bool completeReceive(const std::shared_ptr<Transaction>& transaction, const utils::Identifier& transaction_id);
bool completeSend(const std::shared_ptr<Transaction>& transaction, const utils::Identifier& transaction_id, core::ProcessContext& context);
std::expected<void, std::string> readFlowFileHeaderData(io::InputStream& stream, const std::string& transaction_id, SiteToSiteClient::ReceiveFlowFileHeaderResult& result);
std::expected<ReceiveFlowFileHeaderResult, std::string> receiveFlowFileHeader(io::InputStream& stream, const std::shared_ptr<Transaction>& transaction);
std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<SiteToSiteClient>::getLogger()};
};
} // namespace sitetosite
} // namespace org::apache::nifi::minifi