blob: e34c94858e2f5883e23c2cf9a5cdc076cb3d9974 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "sitetosite/SiteToSiteClient.h"
#include <map>
#include <string>
#include <memory>
#include <ranges>
#include "minifi-cpp/utils/gsl.h"
#include "utils/Enum.h"
#include "sitetosite/CompressionOutputStream.h"
#include "sitetosite/CompressionInputStream.h"
#include "io/StreamPipe.h"
namespace org::apache::nifi::minifi::sitetosite {
std::optional<SiteToSiteResponse> SiteToSiteClient::readResponse(const std::shared_ptr<Transaction>& /*transaction*/) {
uint8_t result_byte = 0;
if (const auto ret = peer_->read(result_byte); ret == 0 || io::isError(ret) || result_byte != CODE_SEQUENCE_VALUE_1) {
logger_->log_error("Site2Site read response failed: invalid code sequence 1 value");
return std::nullopt;
}
if (const auto ret = peer_->read(result_byte); ret == 0 || io::isError(ret) || result_byte != CODE_SEQUENCE_VALUE_2) {
logger_->log_error("Site2Site read response failed: invalid code sequence 2 value");
return std::nullopt;
}
if (const auto ret = peer_->read(result_byte); ret == 0 || io::isError(ret)) {
logger_->log_error("Site2Site read response failed: failed to read response code");
return std::nullopt;
}
SiteToSiteResponse response;
if (auto code = magic_enum::enum_cast<ResponseCode>(result_byte)) {
response.code = *code;
} else {
logger_->log_error("Site2Site read response failed: invalid response code");
return std::nullopt;
}
const ResponseCodeContext* response_code_context = getResponseCodeContext(response.code);
if (!response_code_context) {
logger_->log_error("Site2Site read response failed: invalid response code context");
return std::nullopt;
}
if (response_code_context->has_description) {
if (const auto ret = peer_->read(response.message); ret == 0 || io::isError(ret)) {
logger_->log_error("Site2Site read response failed: failed to read response message");
return std::nullopt;
}
}
return response;
}
void SiteToSiteClient::handleTransactionError(const std::shared_ptr<Transaction>& transaction, core::ProcessContext& context, const std::exception& exception) {
if (transaction) {
deleteTransaction(transaction->getUUID());
}
context.yield();
tearDown();
logger_->log_warn("Caught Exception, type: {}, what: {}", typeid(exception).name(), exception.what());
}
void SiteToSiteClient::deleteTransaction(const utils::Identifier& transaction_id) {
std::shared_ptr<Transaction> transaction;
auto it = known_transactions_.find(transaction_id);
if (it == known_transactions_.end()) {
logger_->log_warn("Site2Site transaction id '{}' not found for delete", transaction_id.to_string());
return;
} else {
transaction = it->second;
}
logger_->log_debug("Site2Site delete transaction {}", transaction->getUUIDStr());
known_transactions_.erase(transaction_id);
}
bool SiteToSiteClient::writeResponse(const std::shared_ptr<Transaction>& /*transaction*/, const SiteToSiteResponse& response) {
const ResponseCodeContext* response_code_context = getResponseCodeContext(response.code);
if (!response_code_context) {
logger_->log_error("Site2Site write response failed: invalid response code context for code {}", magic_enum::enum_underlying(response.code));
return false;
}
const std::array<uint8_t, 3> code_sequence = { CODE_SEQUENCE_VALUE_1, CODE_SEQUENCE_VALUE_2, magic_enum::enum_underlying(response.code) };
const auto ret = peer_->write(code_sequence.data(), 3);
if (ret != 3) {
logger_->log_error("Site2Site write response failed: failed to write code sequence, expected 3 bytes, got {}", ret);
return false;
}
if (response_code_context->has_description) {
return !(io::isError(peer_->write(response.message)));
}
return true;
}
bool SiteToSiteClient::transferFlowFiles(core::ProcessContext& context, core::ProcessSession& session) {
auto flow = session.get();
if (!flow) {
return false;
}
if (peer_state_ != PeerState::READY) {
if (!bootstrap()) {
return false;
}
}
if (peer_state_ != PeerState::READY) {
context.yield();
tearDown();
throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer");
}
auto transaction = createTransaction(TransferDirection::SEND);
if (transaction == nullptr) {
context.yield();
tearDown();
throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction");
}
utils::Identifier transaction_id = transaction->getUUID();
std::chrono::high_resolution_clock::time_point transaction_started_at = std::chrono::high_resolution_clock::now();
try {
while (true) {
auto start_time = std::chrono::steady_clock::now();
if (!sendFlowFile(transaction, *flow, session)) {
throw Exception(SITE2SITE_EXCEPTION, "Send Failed");
}
logger_->log_debug("Site2Site transaction {} send flow record {}", transaction_id.to_string(), flow->getUUIDStr());
auto end_time = std::chrono::steady_clock::now();
std::string transit_uri = peer_->getURL() + "/" + flow->getUUIDStr();
std::string details = "urn:nifi:" + flow->getUUIDStr() + "Remote Host=" + peer_->getHostName();
session.getProvenanceReporter()->send(*flow, transit_uri, details, std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time), false);
session.remove(flow);
std::chrono::nanoseconds transfer_duration = std::chrono::high_resolution_clock::now() - transaction_started_at;
if (transfer_duration > batch_send_nanos_) {
break;
}
flow = session.get();
if (!flow) {
break;
}
}
if (!confirm(transaction_id)) {
throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed for " + transaction_id.to_string());
}
if (!complete(context, transaction_id)) {
throw Exception(SITE2SITE_EXCEPTION, "Complete Failed for " + transaction_id.to_string());
}
logger_->log_debug("Site2Site transaction {} successfully sent flow record {}, content bytes {}", transaction_id.to_string(), transaction->getCurrentTransfers(), transaction->getBytes());
} catch (const std::exception& exception) {
handleTransactionError(transaction, context, exception);
throw;
}
deleteTransaction(transaction_id);
return true;
}
bool SiteToSiteClient::confirmReceive(const std::shared_ptr<Transaction>& transaction, const utils::Identifier& transaction_id) {
if (transaction->isDataAvailable()) {
return false;
}
// we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message
// to peer so that we can verify that the connection is still open. This is a two-phase commit,
// which helps to prevent the chances of data duplication. Without doing this, we may commit the
// session and then when we send the response back to the peer, the peer may have timed out and may not
// be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the
// Critical Section involved in this transaction so that rather than the Critical Section being the
// time window involved in the entire transaction, it is reduced to a simple round-trip conversation.
uint64_t crcValue = transaction->getCRC();
std::string crc = std::to_string(crcValue);
logger_->log_debug("Site2Site Receive confirm with CRC {} to transaction {}", crcValue, transaction_id.to_string());
if (!writeResponse(transaction, {ResponseCode::CONFIRM_TRANSACTION, crc})) {
return false;
}
auto response = readResponse(transaction);
if (!response) {
return false;
}
if (response->code == ResponseCode::CONFIRM_TRANSACTION) {
logger_->log_debug("Site2Site transaction {} peer confirm transaction", transaction_id.to_string());
transaction->setState(TransactionState::TRANSACTION_CONFIRMED);
return true;
} else if (response->code == ResponseCode::BAD_CHECKSUM) {
logger_->log_debug("Site2Site transaction {} peer indicate bad checksum", transaction_id.to_string());
return false;
}
logger_->log_debug("Site2Site transaction {} peer unknown response code {}", transaction_id.to_string(), magic_enum::enum_underlying(response->code));
return false;
}
bool SiteToSiteClient::confirmSend(const std::shared_ptr<Transaction>& transaction, const utils::Identifier& transaction_id) {
logger_->log_debug("Site2Site Send FINISH TRANSACTION for transaction {}", transaction_id.to_string());
if (!writeResponse(transaction, {ResponseCode::FINISH_TRANSACTION, "FINISH_TRANSACTION"})) {
return false;
}
auto response = readResponse(transaction);
if (!response) {
return false;
}
if (response->code != ResponseCode::CONFIRM_TRANSACTION) {
logger_->log_debug("Site2Site transaction {} peer unknown response code {}", transaction_id.to_string(), magic_enum::enum_underlying(response->code));
return false;
}
// we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response
logger_->log_debug("Site2Site transaction {} peer confirm transaction with CRC {}", transaction_id.to_string(), response->message);
if (current_version_ > 3) {
std::string crc = std::to_string(transaction->getCRC());
if (response->message == crc) {
logger_->log_debug("Site2Site transaction {} CRC matched", transaction_id.to_string());
if (!writeResponse(transaction, {ResponseCode::CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION"})) {
return false;
}
transaction->setState(TransactionState::TRANSACTION_CONFIRMED);
return true;
} else {
logger_->log_debug("Site2Site transaction {} CRC not matched {}", transaction_id.to_string(), crc);
writeResponse(transaction, {ResponseCode::BAD_CHECKSUM, "BAD_CHECKSUM"});
return false;
}
} else {
if (!writeResponse(transaction, {ResponseCode::CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION"})) {
return false;
}
transaction->setState(TransactionState::TRANSACTION_CONFIRMED);
return true;
}
}
bool SiteToSiteClient::confirm(const utils::Identifier& transaction_id) {
if (peer_state_ != PeerState::READY) {
bootstrap();
}
if (peer_state_ != PeerState::READY) {
return false;
}
auto it = known_transactions_.find(transaction_id);
if (it == known_transactions_.end()) {
return false;
}
auto transaction = it->second;
if (transaction->getState() == TransactionState::TRANSACTION_STARTED && !transaction->isDataAvailable() &&
transaction->getDirection() == TransferDirection::RECEIVE) {
transaction->setState(TransactionState::TRANSACTION_CONFIRMED);
return true;
}
if (transaction->getState() != TransactionState::DATA_EXCHANGED) {
return false;
}
if (transaction->getDirection() == TransferDirection::RECEIVE) {
return confirmReceive(transaction, transaction_id);
}
return confirmSend(transaction, transaction_id);
}
void SiteToSiteClient::cancel(const utils::Identifier& transaction_id) {
if (peer_state_ != PeerState::READY) {
return;
}
auto it = known_transactions_.find(transaction_id);
if (it == known_transactions_.end()) {
logger_->log_warn("Site2Site transaction id '{}' not found for cancel", transaction_id.to_string());
return;
}
auto transaction = it->second;
if (transaction->getState() == TransactionState::TRANSACTION_CANCELED || transaction->getState() == TransactionState::TRANSACTION_COMPLETED ||
transaction->getState() == TransactionState::TRANSACTION_ERROR) {
logger_->log_debug("Site2Site transaction {} already canceled or completed or in error state", transaction_id.to_string());
return;
}
writeResponse(transaction, {ResponseCode::CANCEL_TRANSACTION, "Cancel"});
transaction->setState(TransactionState::TRANSACTION_CANCELED);
tearDown();
}
void SiteToSiteClient::error(const utils::Identifier& transaction_id) {
auto it = known_transactions_.find(transaction_id);
if (it == known_transactions_.end()) {
logger_->log_warn("Site2Site transaction id '{}' not found for error", transaction_id.to_string());
return;
}
auto transaction = it->second;
transaction->setState(TransactionState::TRANSACTION_ERROR);
tearDown();
}
bool SiteToSiteClient::completeReceive(const std::shared_ptr<Transaction>& transaction, const utils::Identifier& transaction_id) {
if (transaction->getCurrentTransfers() == 0) {
transaction->setState(TransactionState::TRANSACTION_COMPLETED);
return true;
}
logger_->log_debug("Site2Site transaction {} receive finished", transaction_id.to_string());
if (!writeResponse(transaction, {ResponseCode::TRANSACTION_FINISHED, "Finished"})) {
return false;
}
transaction->setState(TransactionState::TRANSACTION_COMPLETED);
return true;
}
bool SiteToSiteClient::completeSend(const std::shared_ptr<Transaction>& transaction, const utils::Identifier& transaction_id, core::ProcessContext& context) {
auto response = readResponse(transaction);
if (!response) {
return false;
}
if (response->code == ResponseCode::TRANSACTION_FINISHED || response->code == ResponseCode::TRANSACTION_FINISHED_BUT_DESTINATION_FULL) {
logger_->log_info("Site2Site transaction {} peer finished transaction", transaction_id.to_string());
transaction->setState(TransactionState::TRANSACTION_COMPLETED);
if (response->code == ResponseCode::TRANSACTION_FINISHED_BUT_DESTINATION_FULL) {
logger_->log_info("Site2Site transaction {} reported destination full, yielding", transaction_id.to_string());
context.yield();
}
return true;
}
logger_->log_warn("Site2Site transaction {} peer unexpected response code {}: {}", transaction_id.to_string(), magic_enum::enum_underlying(response->code), magic_enum::enum_name(response->code));
return false;
}
bool SiteToSiteClient::complete(core::ProcessContext& context, const utils::Identifier& transaction_id) {
if (peer_state_ != PeerState::READY) {
bootstrap();
}
if (peer_state_ != PeerState::READY) {
return false;
}
auto it = known_transactions_.find(transaction_id);
if (it == known_transactions_.end()) {
logger_->log_warn("Site2Site transaction id '{}' not found for complete", transaction_id.to_string());
return false;
}
auto transaction = it->second;
if (transaction->getTotalTransfers() > 0 && transaction->getState() != TransactionState::TRANSACTION_CONFIRMED) {
logger_->log_warn("Site2Site transaction {} not confirmed", transaction_id.to_string());
return false;
}
if (transaction->getDirection() == TransferDirection::RECEIVE) {
return completeReceive(transaction, transaction_id);
}
return completeSend(transaction, transaction_id, context);
}
bool SiteToSiteClient::initializeSend(const std::shared_ptr<Transaction>& transaction) {
if (peer_state_ != PeerState::READY) {
bootstrap();
}
if (peer_state_ != PeerState::READY) {
return false;
}
if (!transaction) {
logger_->log_error("Site2Site transaction is null");
return false;
}
auto transaction_id = transaction->getUUID();
if (transaction->getState() != TransactionState::TRANSACTION_STARTED && transaction->getState() != TransactionState::DATA_EXCHANGED) {
logger_->log_warn("Site2Site transaction {} is not at started or exchanged state", transaction_id.to_string());
return false;
}
if (transaction->getDirection() != TransferDirection::SEND) {
logger_->log_warn("Site2Site transaction {} direction is wrong", transaction_id.to_string());
return false;
}
if (transaction->getCurrentTransfers() > 0) {
if (!writeResponse(transaction, {ResponseCode::CONTINUE_TRANSACTION, "CONTINUE_TRANSACTION"})) {
return false;
}
}
return true;
}
bool SiteToSiteClient::writeAttributesInSendTransaction(io::OutputStream& stream, const std::string& transaction_id_str, const std::map<std::string, std::string>& attributes) {
if (const auto ret = stream.write(gsl::narrow<uint32_t>(attributes.size())); ret != 4) {
logger_->log_error("Failed to write number of attributes!");
return false;
}
return std::ranges::all_of(attributes, [&](const auto& attribute) {
if (const auto ret = stream.write(attribute.first, true); ret == 0 || io::isError(ret)) {
logger_->log_error("Failed to write attribute key {}!", attribute.first);
return false;
}
if (const auto ret = stream.write(attribute.second, true); ret == 0 || io::isError(ret)) {
logger_->log_error("Failed to write attribute value {}!", attribute.second);
return false;
}
logger_->log_debug("Site2Site transaction {} send attribute key {} value {}", transaction_id_str, attribute.first, attribute.second);
return true;
});
}
void SiteToSiteClient::finalizeSendTransaction(const std::shared_ptr<Transaction>& transaction, uint64_t sent_bytes) {
transaction->incrementCurrentTransfers();
transaction->incrementTotalTransfers();
transaction->setState(TransactionState::DATA_EXCHANGED);
transaction->addBytes(sent_bytes);
logger_->log_info("Site to Site transaction {} sent flow {} flow records, with total size {}", transaction->getUUIDStr(), transaction->getTotalTransfers(), transaction->getBytes());
}
bool SiteToSiteClient::sendFlowFile(const std::shared_ptr<Transaction>& transaction, core::FlowFile& flow_file, core::ProcessSession& session) {
if (!initializeSend(transaction)) {
return false;
}
std::unique_ptr<CompressionOutputStream> compression_stream;
std::unique_ptr<io::CRCStream<io::OutputStream>> compression_wrapper_crc_stream;
if (use_compression_) {
compression_stream = std::make_unique<CompressionOutputStream>(transaction->getStream());
compression_wrapper_crc_stream = std::make_unique<io::CRCStream<io::OutputStream>>(gsl::make_not_null(compression_stream.get()));
}
io::OutputStream& stream = use_compression_ ? static_cast<io::OutputStream&>(*compression_wrapper_crc_stream) : static_cast<io::OutputStream&>(transaction->getStream());
auto attributes = flow_file.getAttributes();
if (!writeAttributesInSendTransaction(stream, transaction->getUUIDStr(), attributes)) {
return false;
}
bool flowfile_has_content = [&]() {
if (flow_file.getResourceClaim() == nullptr || !flow_file.getResourceClaim()->exists()) {
auto path = flow_file.getResourceClaim() != nullptr ? flow_file.getResourceClaim()->getContentFullPath() : "nullclaim";
logger_->log_debug("Claim {} does not exist for FlowFile {}", path, flow_file.getUUIDStr());
return false;
}
return true;
}();
uint64_t len = 0;
if (flowfile_has_content) {
len = flow_file.getSize();
const auto ret = stream.write(len);
if (ret != 8) {
logger_->log_debug("Failed to write content size!");
return false;
}
if (flow_file.getSize() > 0) {
auto read_result = session.read(flow_file, [&stream](const std::shared_ptr<io::InputStream>& input_stream) -> int64_t {
return internal::pipe(*input_stream, stream);
});
if (flow_file.getSize() != gsl::narrow<uint64_t>(read_result)) {
logger_->log_debug("Mismatched sizes {} {}", flow_file.getSize(), read_result);
return false;
}
} else {
logger_->log_trace("Flowfile empty {}", flow_file.getResourceClaim()->getContentFullPath());
}
} else {
const auto ret = stream.write(len); // Indicate zero length
if (ret != 8) {
logger_->log_debug("Failed to write content size (0)!");
return false;
}
}
if (compression_stream) {
// Update the CRC value to use the uncompressed stream CRC
compression_stream->flush();
transaction->getStream().setCrc(compression_wrapper_crc_stream->getCRC());
}
finalizeSendTransaction(transaction, len);
return true;
}
bool SiteToSiteClient::sendPacket(const DataPacket& packet) {
if (!initializeSend(packet.transaction)) {
return false;
}
auto transaction = packet.transaction;
std::unique_ptr<CompressionOutputStream> compression_stream;
std::unique_ptr<io::CRCStream<io::OutputStream>> compression_wrapper_crc_stream;
if (use_compression_) {
compression_stream = std::make_unique<CompressionOutputStream>(packet.transaction->getStream());
compression_wrapper_crc_stream = std::make_unique<io::CRCStream<io::OutputStream>>(gsl::make_not_null(compression_stream.get()));
}
io::OutputStream& stream = use_compression_ ? static_cast<io::OutputStream&>(*compression_wrapper_crc_stream) : static_cast<io::OutputStream&>(transaction->getStream());
if (!writeAttributesInSendTransaction(stream, transaction->getUUIDStr(), packet.attributes)) {
return false;
}
uint64_t len = 0;
if (!packet.payload.empty()) {
len = packet.payload.length();
if (const auto ret = stream.write(len); ret != 8) {
logger_->log_debug("Failed to write payload size!");
return false;
}
if (const auto ret = stream.write(reinterpret_cast<const uint8_t*>(packet.payload.c_str()), gsl::narrow<size_t>(len)); ret != gsl::narrow<size_t>(len)) {
logger_->log_debug("Failed to write payload!");
return false;
}
}
if (compression_stream) {
// Update the CRC value to use the uncompressed stream CRC
compression_stream->flush();
transaction->getStream().setCrc(compression_wrapper_crc_stream->getCRC());
}
finalizeSendTransaction(transaction, len);
return true;
}
bool SiteToSiteClient::readFlowFileHeaderData(io::InputStream& stream, const std::string& transaction_id_str, SiteToSiteClient::ReceiveFlowFileHeaderResult& result) {
uint32_t num_attributes = 0;
if (const auto ret = stream.read(num_attributes); ret == 0 || io::isError(ret) || num_attributes > MAX_NUM_ATTRIBUTES) {
logger_->log_error("Site2Site failed to read number of attributes with return code {}, or number of attributes is invalid: {}", ret, num_attributes);
return false;
}
logger_->log_debug("Site2Site transaction {} receives {} attributes", transaction_id_str, num_attributes);
for (uint64_t i = 0; i < num_attributes; i++) {
std::string key;
std::string value;
if (const auto ret = stream.read(key, true); ret == 0 || io::isError(ret)) {
logger_->log_error("Site2Site transaction {} failed to read attribute key", transaction_id_str);
return false;
}
if (const auto ret = stream.read(value, true); ret == 0 || io::isError(ret)) {
logger_->log_error("Site2Site transaction {} failed to read attribute value for key {}", transaction_id_str, key);
return false;
}
result.attributes[key] = value;
logger_->log_debug("Site2Site transaction {} receives attribute key {} value {}", transaction_id_str, key, value);
}
uint64_t len = 0;
if (const auto ret = stream.read(len); ret == 0 || io::isError(ret)) {
logger_->log_error("Site2Site transaction {} failed to read flow file data size", transaction_id_str);
return false;
}
result.flow_file_data_size = len;
return true;
}
std::optional<SiteToSiteClient::ReceiveFlowFileHeaderResult> SiteToSiteClient::receiveFlowFileHeader(io::InputStream& stream, const std::shared_ptr<Transaction>& transaction) {
if (peer_state_ != PeerState::READY) {
bootstrap();
}
if (peer_state_ != PeerState::READY) {
return std::nullopt;
}
const auto transaction_id_str = transaction->getUUIDStr();
if (transaction->getState() != TransactionState::TRANSACTION_STARTED && transaction->getState() != TransactionState::DATA_EXCHANGED) {
logger_->log_warn("Site2Site transaction {} is not at started or exchanged state", transaction_id_str);
return std::nullopt;
}
if (transaction->getDirection() != TransferDirection::RECEIVE) {
logger_->log_warn("Site2Site transaction {} direction is wrong", transaction_id_str);
return std::nullopt;
}
ReceiveFlowFileHeaderResult result;
if (!transaction->isDataAvailable()) {
result.eof = true;
return result;
}
if (transaction->getCurrentTransfers() > 0) {
// if we already have transferred a flow file before, check to see whether another one is available
auto response = readResponse(transaction);
if (!response) {
return std::nullopt;
}
if (response->code == ResponseCode::CONTINUE_TRANSACTION) {
logger_->log_debug("Site2Site transaction {} peer indicate continue transaction", transaction_id_str);
transaction->setDataAvailable(true);
} else if (response->code == ResponseCode::FINISH_TRANSACTION) {
logger_->log_debug("Site2Site transaction {} peer indicate finish transaction", transaction_id_str);
transaction->setDataAvailable(false);
result.eof = true;
return result;
} else {
logger_->log_debug("Site2Site transaction {} peer indicate wrong response code {}", transaction_id_str, magic_enum::enum_underlying(response->code));
return std::nullopt;
}
}
if (!transaction->isDataAvailable()) {
logger_->log_debug("No data is available");
result.eof = true;
return result;
}
if (!readFlowFileHeaderData(stream, transaction_id_str, result)) {
logger_->log_error("Site2Site transaction {} failed to read flow file header data", transaction_id_str);
return std::nullopt;
}
if (result.flow_file_data_size > 0 || !result.attributes.empty()) {
transaction->incrementCurrentTransfers();
transaction->incrementTotalTransfers();
} else {
logger_->log_warn("Site2Site transaction {} empty flow file without attribute", transaction_id_str);
transaction->setDataAvailable(false);
result.eof = true;
return result;
}
transaction->setState(TransactionState::DATA_EXCHANGED);
transaction->addBytes(result.flow_file_data_size);
logger_->log_info("Site to Site transaction {} received flow record {}, total length {}, added {}",
transaction_id_str, transaction->getTotalTransfers(), transaction->getBytes(), result.flow_file_data_size);
return result;
}
std::pair<uint64_t, uint64_t> SiteToSiteClient::readFlowFiles(const std::shared_ptr<Transaction>& transaction, core::ProcessSession& session) {
uint64_t transfers = 0;
uint64_t bytes = 0;
std::unique_ptr<CompressionInputStream> compression_stream;
std::unique_ptr<io::CRCStream<io::InputStream>> compression_wrapper_crc_stream;
if (use_compression_) {
compression_stream = std::make_unique<CompressionInputStream>(transaction->getStream());
compression_wrapper_crc_stream = std::make_unique<io::CRCStream<io::InputStream>>(gsl::make_not_null(compression_stream.get()));
}
io::InputStream& stream = use_compression_ ? static_cast<io::InputStream&>(*compression_wrapper_crc_stream) : static_cast<io::InputStream&>(transaction->getStream());
while (true) {
auto start_time = std::chrono::steady_clock::now();
auto receive_header_result = receiveFlowFileHeader(stream, transaction);
if (!receive_header_result) {
throw Exception(SITE2SITE_EXCEPTION, "Receive Failed " + transaction->getUUIDStr());
}
if (receive_header_result->eof) {
// transaction done
break;
}
auto flow_file = session.create();
if (!flow_file) {
throw Exception(SITE2SITE_EXCEPTION, "Flow File Creation Failed");
}
std::string source_identifier;
for (const auto& [key, value] : receive_header_result->attributes) {
if (key == core::SpecialFlowAttribute::UUID) {
source_identifier = value;
}
flow_file->addAttribute(key, value);
}
if (receive_header_result->flow_file_data_size > 0) {
session.write(flow_file, [&receive_header_result, &stream](const std::shared_ptr<io::OutputStream>& output_stream) -> int64_t {
uint64_t len = receive_header_result->flow_file_data_size;
std::array<std::byte, utils::configuration::DEFAULT_BUFFER_SIZE> buffer{};
while (len > 0) {
const auto size = std::min(len, uint64_t{utils::configuration::DEFAULT_BUFFER_SIZE});
const auto ret = stream.read(std::as_writable_bytes(std::span(buffer).subspan(0, size)));
if (ret != size) {
return -1;
}
output_stream->write(std::span(buffer).subspan(0, size));
len -= size;
}
return gsl::narrow<int64_t>(receive_header_result->flow_file_data_size);
});
if (flow_file->getSize() != receive_header_result->flow_file_data_size) {
std::stringstream message;
message << "Receive size not correct, expected to send " << flow_file->getSize() << " bytes, but actually sent " << receive_header_result->flow_file_data_size;
throw Exception(SITE2SITE_EXCEPTION, message.str());
}
}
core::Relationship relation{"undefined", ""};
auto end_time = std::chrono::steady_clock::now();
std::string transitUri = peer_->getURL() + "/" + source_identifier;
std::string details = "urn:nifi:" + source_identifier + "Remote Host=" + peer_->getHostName();
session.getProvenanceReporter()->receive(*flow_file, transitUri, source_identifier, details, std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time));
session.transfer(flow_file, relation);
bytes += receive_header_result->flow_file_data_size;
transfers++;
if (compression_stream) {
// Non-compressed response codes are written between flow files, so we need to reset the compression stream buffer for the next flow file
compression_stream->resetBuffer();
}
}
if (compression_stream) {
// Update the CRC value to use the uncompressed stream CRC
transaction->getStream().setCrc(compression_wrapper_crc_stream->getCRC());
}
return {transfers, bytes};
}
bool SiteToSiteClient::receiveFlowFiles(core::ProcessContext& context, core::ProcessSession& session) {
if (peer_state_ != PeerState::READY) {
if (!bootstrap()) {
return false;
}
}
if (peer_state_ != PeerState::READY) {
context.yield();
tearDown();
throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer");
}
auto transaction = createTransaction(TransferDirection::RECEIVE);
if (transaction == nullptr) {
context.yield();
tearDown();
throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction");
}
utils::Identifier transaction_id = transaction->getUUID();
try {
auto [transfers, bytes] = readFlowFiles(transaction, session);
if (transfers > 0 && !confirm(transaction_id)) {
throw Exception(SITE2SITE_EXCEPTION, "Confirm Transaction Failed");
}
if (!complete(context, transaction_id)) {
std::stringstream transaction_str;
transaction_str << "Complete Transaction " << transaction_id.to_string() << " Failed";
throw Exception(SITE2SITE_EXCEPTION, transaction_str.str());
}
logger_->log_info("Site to Site transaction {} received flow record {}, with content size {} bytes", transaction_id.to_string(), transfers, bytes);
// we yield the receive if we did not get anything
if (transfers == 0) {
context.yield();
}
} catch (const std::exception& exception) {
handleTransactionError(transaction, context, exception);
throw;
}
deleteTransaction(transaction_id);
return true;
}
const ResponseCodeContext* SiteToSiteClient::getResponseCodeContext(ResponseCode code) {
auto it = std::find_if(response_code_contexts.begin(), response_code_contexts.end(), [code](const ResponseCodeContext& context) {
return context.code == code;
});
if (it != response_code_contexts.end()) {
return &(*it);
}
return nullptr;
}
} // namespace org::apache::nifi::minifi::sitetosite