blob: 1eeb5a00980ef2ec3db4bb824e33fd5bf267be28 [file] [log] [blame]
/**
* @file ProcessSession.cpp
* ProcessSession class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "core/ProcessSession.h"
#include <algorithm>
#include <chrono>
#include <cinttypes>
#include <ctime>
#include <iostream>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <vector>
#include "core/ProcessSessionReadCallback.h"
#include "io/StreamSlice.h"
#include "io/StreamPipe.h"
#include "minifi-cpp/utils/gsl.h"
#include "core/Processor.h"
/* This implementation is only for native Windows systems. */
#if (defined _WIN32 || defined __WIN32__) && !defined __CYGWIN__
#define _WINSOCKAPI_
#ifndef WIN32_LEAN_AND_MEAN
#define WIN32_LEAN_AND_MEAN
#endif
#include <WinSock2.h>
#include <WS2tcpip.h>
#include <Windows.h>
#pragma comment(lib, "Ws2_32.lib")
#include <direct.h>
int getpagesize(void) {
return 4096;
// SYSTEM_INFO system_info;
// GetSystemInfo(&system_info);
// return system_info.dwPageSize;
}
#else
#include <unistd.h>
#endif
namespace org::apache::nifi::minifi::core {
std::shared_ptr<utils::IdGenerator> ProcessSessionImpl::id_generator_ = utils::IdGenerator::getIdGenerator();
ProcessSessionImpl::ProcessSessionImpl(std::shared_ptr<ProcessContext> processContext)
: process_context_(std::move(processContext)),
logger_(logging::LoggerFactory<ProcessSession>::getLogger()),
stateManager_(process_context_->hasStateManager() ? process_context_->getStateManager() : nullptr) {
logger_->log_trace("ProcessSession created for {}", process_context_->getProcessor().getName());
auto repo = process_context_->getProvenanceRepository();
provenance_report_ = std::make_shared<provenance::ProvenanceReporterImpl>(repo, process_context_->getProcessor().getName(), process_context_->getProcessor().getName());
content_session_ = process_context_->getContentRepository()->createSession();
if (stateManager_ && !stateManager_->beginTransaction()) {
throw Exception(PROCESS_SESSION_EXCEPTION, "State manager transaction could not be initiated.");
}
}
ProcessSessionImpl::~ProcessSessionImpl() {
if (stateManager_ && stateManager_->isTransactionInProgress()) {
logger_->log_critical("Session has ended without decision on state (commit or rollback).");
std::terminate();
}
removeReferences();
}
void ProcessSessionImpl::add(const std::shared_ptr<core::FlowFile> &record) {
utils::Identifier uuid = record->getUUID();
if (updated_flowfiles_.contains(uuid)) {
throw Exception(ExceptionType::PROCESSOR_EXCEPTION, "Mustn't add file that was provided by this session");
}
added_flowfiles_[uuid].flow_file = record;
record->setDeleted(false);
}
std::shared_ptr<core::FlowFile> ProcessSessionImpl::create(const core::FlowFile* const parent) {
auto record = std::make_shared<FlowFileRecordImpl>();
auto flow_version = process_context_->getProcessor().getFlowIdentifier();
if (flow_version != nullptr) {
record->setAttribute(SpecialFlowAttribute::FLOW_ID, flow_version->getFlowId());
}
if (parent) {
for (const auto& attribute : parent->getAttributes()) {
if (attribute.first == SpecialFlowAttribute::ALTERNATE_IDENTIFIER || attribute.first == SpecialFlowAttribute::DISCARD_REASON || attribute.first == SpecialFlowAttribute::UUID) {
// Do not copy special attributes from parent
continue;
}
record->setAttribute(attribute.first, attribute.second);
}
record->setLineageStartDate(parent->getlineageStartDate());
record->setLineageIdentifiers(parent->getlineageIdentifiers());
record->getlineageIdentifiers().push_back(parent->getUUID());
}
utils::Identifier uuid = record->getUUID();
added_flowfiles_[uuid].flow_file = record;
logger_->log_debug("Create FlowFile with UUID {}", record->getUUIDStr());
std::stringstream details;
details << process_context_->getProcessor().getName() << " creates flow record " << record->getUUIDStr();
provenance_report_->create(*record, details.str());
return record;
}
std::shared_ptr<core::FlowFile> ProcessSessionImpl::clone(const core::FlowFile& parent) {
std::shared_ptr<core::FlowFile> record = this->create(&parent);
if (record) {
logger_->log_debug("Cloned parent flow files {} to {}", parent.getUUIDStr(), record->getUUIDStr());
// Copy Resource Claim
std::shared_ptr<ResourceClaim> parent_claim = parent.getResourceClaim();
record->setResourceClaim(parent_claim);
if (parent_claim) {
record->setOffset(parent.getOffset());
record->setSize(parent.getSize());
}
provenance_report_->clone(parent, *record);
}
return record;
}
std::shared_ptr<core::FlowFile> ProcessSessionImpl::cloneDuringTransfer(const core::FlowFile& parent) {
auto record = std::make_shared<FlowFileRecordImpl>();
auto flow_version = process_context_->getProcessor().getFlowIdentifier();
if (flow_version != nullptr) {
record->setAttribute(SpecialFlowAttribute::FLOW_ID, flow_version->getFlowId());
}
this->cloned_flowfiles_.push_back(record);
logger_->log_debug("Clone FlowFile with UUID {} during transfer", record->getUUIDStr());
// Copy attributes
for (const auto& attribute : parent.getAttributes()) {
if (attribute.first == SpecialFlowAttribute::ALTERNATE_IDENTIFIER
|| attribute.first == SpecialFlowAttribute::DISCARD_REASON
|| attribute.first == SpecialFlowAttribute::UUID) {
// Do not copy special attributes from parent
continue;
}
record->setAttribute(attribute.first, attribute.second);
}
record->setLineageStartDate(parent.getlineageStartDate());
record->setLineageIdentifiers(parent.getlineageIdentifiers());
record->getlineageIdentifiers().push_back(parent.getUUID());
// Copy Resource Claim
std::shared_ptr<ResourceClaim> parent_claim = parent.getResourceClaim();
record->setResourceClaim(parent_claim);
if (parent_claim != nullptr) {
record->setOffset(parent.getOffset());
record->setSize(parent.getSize());
}
provenance_report_->clone(parent, *record);
return record;
}
std::shared_ptr<core::FlowFile> ProcessSessionImpl::clone(const FlowFile& parent, int64_t offset, int64_t size) {
if (gsl::narrow<uint64_t>(offset + size) > parent.getSize()) {
// Set offset and size
logger_->log_error("clone offset {} and size {} exceed parent size {}", offset, size, parent.getSize());
return nullptr;
}
std::shared_ptr<core::FlowFile> record = this->create(&parent);
if (record) {
logger_->log_debug("Cloned parent flow files {} to {}, with {}:{}", parent.getUUIDStr(), record->getUUIDStr(), offset, size);
if (parent.getResourceClaim()) {
write(record, [&] (const std::shared_ptr<io::OutputStream>& output) -> int64_t {
return read(parent, [&] (const std::shared_ptr<io::InputStream>& input) -> int64_t {
io::StreamSlice slice(input, offset, size);
return minifi::internal::pipe(slice, *output);
});
});
}
provenance_report_->clone(parent, *record);
}
return record;
}
void ProcessSessionImpl::remove(const std::shared_ptr<core::FlowFile> &flow) {
logger_->log_debug("Removing flow file with UUID: {}", flow->getUUIDStr());
flow->setDeleted(true);
deleted_flowfiles_.push_back(flow);
std::string reason = process_context_->getProcessor().getName() + " drop flow record " + flow->getUUIDStr();
provenance_report_->drop(*flow, reason);
}
void ProcessSessionImpl::putAttribute(core::FlowFile& flow_file, std::string_view key, const std::string& value) {
flow_file.setAttribute(key, value);
std::string details = fmt::format("{} modify flow record {} attribute {}:{}", process_context_->getProcessor().getName(), flow_file.getUUIDStr(), key, value);
provenance_report_->modifyAttributes(flow_file, details);
}
void ProcessSessionImpl::removeAttribute(core::FlowFile& flow_file, std::string_view key) {
flow_file.removeAttribute(key);
std::string details = fmt::format("{} remove flow record {} attribute {}", process_context_->getProcessor().getName(), flow_file.getUUIDStr(), key);
provenance_report_->modifyAttributes(flow_file, details);
}
void ProcessSessionImpl::penalize(const std::shared_ptr<core::FlowFile> &flow) {
const std::chrono::milliseconds penalization_period = process_context_->getProcessor().getPenalizationPeriod();
logger_->log_info("Penalizing {} for {} at {}", flow->getUUIDStr(), penalization_period, process_context_->getProcessor().getName());
std::dynamic_pointer_cast<FlowFileImpl>(flow)->penalize(penalization_period);
}
void ProcessSessionImpl::transfer(const std::shared_ptr<core::FlowFile>& flow, const Relationship& relationship) {
logger_->log_debug("Transferring {} from {} to relationship {}", flow->getUUIDStr(), process_context_->getProcessor().getName(), relationship.getName());
utils::Identifier uuid = flow->getUUID();
if (auto it = added_flowfiles_.find(uuid); it != added_flowfiles_.end()) {
it->second.rel = &*relationships_.insert(relationship).first;
} else {
updated_relationships_[uuid] = &*relationships_.insert(relationship).first;
}
flow->setDeleted(false);
}
void ProcessSessionImpl::transferToCustomRelationship(const std::shared_ptr<core::FlowFile>& flow, const std::string& relationship_name) {
transfer(flow, Relationship{relationship_name, relationship_name});
}
void ProcessSessionImpl::write(const std::shared_ptr<core::FlowFile> &flow, const io::OutputStreamCallback& callback) {
write(*flow, callback);
}
void ProcessSessionImpl::write(core::FlowFile &flow, const io::OutputStreamCallback& callback) {
gsl_ExpectsAudit(updated_flowfiles_.contains(flow.getUUID())
|| added_flowfiles_.contains(flow.getUUID())
|| std::any_of(cloned_flowfiles_.begin(), cloned_flowfiles_.end(), [&flow](const auto& flow_file) { return &flow == flow_file.get(); }));
std::shared_ptr<ResourceClaim> claim = content_session_->create();
try {
auto start_time = std::chrono::steady_clock::now();
std::shared_ptr<io::BaseStream> stream = content_session_->write(claim);
// Call the callback to write the content
if (nullptr == stream) {
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for write");
}
if (callback(stream) < 0) {
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content");
}
flow.setSize(stream->size());
flow.setOffset(0);
flow.setResourceClaim(claim);
stream->close();
std::string details = process_context_->getProcessor().getName() + " modify flow record content " + flow.getUUIDStr();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time);
provenance_report_->modifyContent(flow, details, duration);
if (metrics_) {
metrics_->bytesWritten() += stream->size();
}
} catch (const std::exception& exception) {
logger_->log_debug("Caught Exception during process session write, type: {}, what: {}", typeid(exception).name(), exception.what());
throw;
} catch (...) {
logger_->log_debug("Caught Exception during process session write, type: {}", getCurrentExceptionTypeName());
throw;
}
}
void ProcessSessionImpl::writeBuffer(const std::shared_ptr<core::FlowFile>& flow_file, std::span<const char> buffer) {
writeBuffer(flow_file, as_bytes(buffer));
}
void ProcessSessionImpl::writeBuffer(const std::shared_ptr<core::FlowFile>& flow_file, std::span<const std::byte> buffer) {
write(flow_file, [buffer](const std::shared_ptr<io::OutputStream>& output_stream) {
const auto write_status = output_stream->write(buffer);
return io::isError(write_status) ? -1 : gsl::narrow<int64_t>(write_status);
});
}
void ProcessSessionImpl::append(const std::shared_ptr<core::FlowFile> &flow, const io::OutputStreamCallback& callback) {
gsl_ExpectsAudit(updated_flowfiles_.contains(flow->getUUID())
|| added_flowfiles_.contains(flow->getUUID())
|| std::any_of(cloned_flowfiles_.begin(), cloned_flowfiles_.end(), [&flow](const auto& flow_file) { return flow == flow_file; }));
std::shared_ptr<ResourceClaim> claim = flow->getResourceClaim();
if (!claim) {
// No existed claim for append, we need to create new claim
write(flow, callback);
return;
}
try {
auto start_time = std::chrono::steady_clock::now();
size_t end_offset = flow->getOffset() + flow->getSize();
std::shared_ptr<io::BaseStream> stream = content_session_->append(claim, end_offset, [&] (const auto& new_claim) {flow->setResourceClaim(new_claim);});
if (nullptr == stream) {
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for append");
}
// Call the callback to write the content
size_t flow_file_size = flow->getSize();
size_t stream_size_before_callback = stream->size();
// this prevents an issue if we write, above, with zero length.
if (stream_size_before_callback > 0)
stream->seek(stream_size_before_callback);
if (callback(stream) < 0) {
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content");
}
flow->setSize(flow_file_size + (stream->size() - stream_size_before_callback));
if (metrics_) {
metrics_->bytesWritten() += stream->size() - stream_size_before_callback;
}
std::stringstream details;
details << process_context_->getProcessor().getName() << " modify flow record content " << flow->getUUIDStr();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time);
provenance_report_->modifyContent(*flow, details.str(), duration);
} catch (const std::exception& exception) {
logger_->log_debug("Caught Exception during process session append, type: {}, what: {}", typeid(exception).name(), exception.what());
throw;
} catch (...) {
logger_->log_debug("Caught Exception during process session append, type: {}", getCurrentExceptionTypeName());
throw;
}
}
void ProcessSessionImpl::appendBuffer(const std::shared_ptr<core::FlowFile>& flow_file, std::span<const char> buffer) {
appendBuffer(flow_file, as_bytes(buffer));
}
void ProcessSessionImpl::appendBuffer(const std::shared_ptr<core::FlowFile>& flow_file, std::span<const std::byte> buffer) {
if (buffer.empty()) { return; }
append(flow_file, [buffer](const std::shared_ptr<io::OutputStream>& output_stream) {
const auto write_status = output_stream->write(buffer);
return io::isError(write_status) ? -1 : gsl::narrow<int64_t>(write_status);
});
}
std::shared_ptr<io::InputStream> ProcessSessionImpl::getFlowFileContentStream(const core::FlowFile& flow_file) {
if (flow_file.getResourceClaim() == nullptr) {
logger_->log_debug("For {}, no resource claim but size is {}", flow_file.getUUIDStr(), flow_file.getSize());
if (flow_file.getSize() == 0) {
return {};
}
throw Exception(FILE_OPERATION_EXCEPTION, "No Content Claim existed for read");
}
std::shared_ptr<ResourceClaim> claim = flow_file.getResourceClaim();
std::shared_ptr<io::InputStream> stream = content_session_->read(claim);
if (nullptr == stream) {
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for read");
}
return std::make_shared<io::StreamSlice>(stream, flow_file.getOffset(), flow_file.getSize());
}
int64_t ProcessSessionImpl::read(const std::shared_ptr<core::FlowFile>& flow_file, const io::InputStreamCallback& callback) {
return read(*flow_file, callback);
}
int64_t ProcessSessionImpl::read(const core::FlowFile& flow_file, const io::InputStreamCallback& callback) {
try {
auto flow_file_stream = getFlowFileContentStream(flow_file);
if (!flow_file_stream) {
return 0;
}
auto ret = callback(flow_file_stream);
if (ret < 0) {
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content");
}
if (metrics_) {
metrics_->bytesRead() += ret;
}
return ret;
} catch (const std::exception& exception) {
logger_->log_debug("Caught Exception {}", exception.what());
throw;
} catch (...) {
logger_->log_debug("Caught Exception during process session read");
throw;
}
}
int64_t ProcessSessionImpl::readWrite(const std::shared_ptr<core::FlowFile> &flow, const io::InputOutputStreamCallback& callback) {
gsl_Expects(callback);
try {
if (flow->getResourceClaim() == nullptr) {
logger_->log_debug("For {}, no resource claim but size is {}", flow->getUUIDStr(), flow->getSize());
if (flow->getSize() == 0) {
return 0;
}
throw Exception(FILE_OPERATION_EXCEPTION, "No Content Claim existed for read");
}
std::shared_ptr<ResourceClaim> input_claim = flow->getResourceClaim();
std::shared_ptr<io::BaseStream> input_stream = content_session_->read(input_claim);
if (!input_stream) {
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for read");
}
input_stream->seek(flow->getOffset());
std::shared_ptr<ResourceClaim> output_claim = content_session_->create();
std::shared_ptr<io::BaseStream> output_stream = content_session_->write(output_claim);
if (!output_stream) {
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for write");
}
auto read_write_result = callback(input_stream, output_stream);
if (!read_write_result) {
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content");
}
input_stream->close();
output_stream->close();
flow->setSize(gsl::narrow<uint64_t>(read_write_result->bytes_written));
flow->setOffset(0);
flow->setResourceClaim(output_claim);
if (metrics_) {
metrics_->bytesWritten() += read_write_result->bytes_written;
metrics_->bytesRead() += read_write_result->bytes_read;
}
return read_write_result->bytes_written;
} catch (const std::exception& exception) {
logger_->log_debug("Caught exception during process session readWrite, type: {}, what: {}", typeid(exception).name(), exception.what());
throw;
} catch (...) {
logger_->log_debug("Caught unknown exception during process session readWrite, type: {}", getCurrentExceptionTypeName());
throw;
}
}
detail::ReadBufferResult ProcessSessionImpl::readBuffer(const std::shared_ptr<core::FlowFile>& flow) {
detail::ReadBufferResult result;
result.status = read(flow, [&result, this](const std::shared_ptr<io::InputStream>& input_stream) {
result.buffer.resize(input_stream->size());
const auto read_status = input_stream->read(result.buffer);
if (read_status != result.buffer.size()) {
logger_->log_error("readBuffer: {} bytes were requested from the stream but {} bytes were read. Rolling back.", result.buffer.size(), read_status);
throw Exception(PROCESSOR_EXCEPTION, "Failed to read the entire FlowFile.");
}
return gsl::narrow<int64_t>(read_status);
});
return result;
}
void ProcessSessionImpl::importFrom(io::InputStream&& stream, const std::shared_ptr<core::FlowFile> &flow) { // NOLINT(cppcoreguidelines-rvalue-reference-param-not-moved)
importFrom(stream, flow);
}
/**
* Imports a file from the data stream
* @param stream incoming data stream that contains the data to store into a file
* @param flow flow file
*
*/
void ProcessSessionImpl::importFrom(io::InputStream &stream, const std::shared_ptr<core::FlowFile> &flow) {
const std::shared_ptr<ResourceClaim> claim = content_session_->create();
const auto max_read = gsl::narrow_cast<size_t>(getpagesize());
std::vector<std::byte> buffer(max_read);
try {
auto start_time = std::chrono::steady_clock::now();
std::shared_ptr<io::BaseStream> content_stream = content_session_->write(claim);
if (nullptr == content_stream) {
throw Exception(FILE_OPERATION_EXCEPTION, "Could not obtain claim for " + claim->getContentFullPath());
}
size_t position = 0;
const auto max_size = stream.size();
while (position < max_size) {
const auto read_size = std::min(max_read, max_size - position);
const auto subbuffer = gsl::make_span(buffer).subspan(0, read_size);
stream.read(subbuffer);
content_stream->write(subbuffer);
position += read_size;
}
// Open the source file and stream to the flow file
flow->setSize(content_stream->size());
flow->setOffset(0);
flow->setResourceClaim(claim);
logger_->log_debug("Import offset {} length {} into content {} for FlowFile UUID {}",
flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath(), flow->getUUIDStr());
content_stream->close();
if (metrics_) {
metrics_->bytesWritten() += content_stream->size();
}
std::stringstream details;
details << process_context_->getProcessor().getName() << " modify flow record content " << flow->getUUIDStr();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time);
provenance_report_->modifyContent(*flow, details.str(), duration);
} catch (const std::exception& exception) {
logger_->log_debug("Caught Exception during ProcessSession::importFrom, type: {}, what: {}", typeid(exception).name(), exception.what());
throw;
} catch (...) {
logger_->log_debug("Caught Exception during ProcessSession::importFrom, type: {}", getCurrentExceptionTypeName());
throw;
}
}
void ProcessSessionImpl::import(const std::string& source, const std::shared_ptr<FlowFile> &flow, bool keepSource, uint64_t offset) {
std::shared_ptr<ResourceClaim> claim = content_session_->create();
size_t size = getpagesize();
std::vector<uint8_t> charBuffer(size);
try {
auto start_time = std::chrono::steady_clock::now();
std::ifstream input;
input.open(source.c_str(), std::fstream::in | std::fstream::binary);
std::shared_ptr<io::BaseStream> stream = content_session_->write(claim);
if (nullptr == stream) {
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open new flowfile content for write");
}
if (input.is_open() && input.good()) {
bool invalidWrite = false;
// Open the source file and stream to the flow file
if (offset != 0) {
input.seekg(gsl::narrow<std::streamoff>(offset));
if (!input.good()) {
logger_->log_error("Seeking to {} failed for file {} (does file/filesystem support seeking?)", offset, source);
invalidWrite = true;
}
}
while (input.good()) {
input.read(reinterpret_cast<char*>(charBuffer.data()), gsl::narrow<std::streamsize>(size));
if (input) {
if (io::isError(stream->write(charBuffer.data(), size))) {
invalidWrite = true;
break;
}
} else {
if (io::isError(stream->write(reinterpret_cast<uint8_t*>(charBuffer.data()), gsl::narrow<size_t>(input.gcount())))) {
invalidWrite = true;
break;
}
}
}
if (!invalidWrite) {
flow->setSize(stream->size());
flow->setOffset(0);
flow->setResourceClaim(claim);
logger_->log_debug("Import offset {} length {} into content {} for FlowFile UUID {}", flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath(),
flow->getUUIDStr());
stream->close();
if (metrics_) {
metrics_->bytesWritten() += stream->size();
}
input.close();
if (!keepSource) {
(void)std::remove(source.c_str());
}
std::stringstream details;
details << process_context_->getProcessor().getName() << " modify flow record content " << flow->getUUIDStr();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time);
provenance_report_->modifyContent(*flow, details.str(), duration);
} else {
stream->close();
input.close();
throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
}
} else {
throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
}
} catch (const std::exception& exception) {
logger_->log_debug("Caught Exception during ProcessSession::import, type: {}, what: {}", typeid(exception).name(), exception.what());
throw;
} catch (...) {
logger_->log_debug("Caught Exception during ProcessSession::import, type: {}", getCurrentExceptionTypeName());
throw;
}
}
void ProcessSessionImpl::import(const std::string& source, std::vector<std::shared_ptr<FlowFile>> &flows, uint64_t offset, char inputDelimiter) {
std::shared_ptr<ResourceClaim> claim;
std::shared_ptr<io::BaseStream> stream;
std::shared_ptr<core::FlowFile> flowFile;
std::vector<uint8_t> buffer(getpagesize());
try {
std::ifstream input{source, std::ios::in | std::ios::binary};
logger_->log_debug("Opening {}", source);
if (!input.is_open() || !input.good()) {
throw Exception(FILE_OPERATION_EXCEPTION, utils::string::join_pack("File Import Error: failed to open file \'", source, "\'"));
}
if (offset != 0U) {
input.seekg(gsl::narrow<std::streamoff>(offset), std::ifstream::beg);
if (!input.good()) {
logger_->log_error("Seeking to {} failed for file {} (does file/filesystem support seeking?)", offset, source);
throw Exception(FILE_OPERATION_EXCEPTION, utils::string::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset)));
}
}
while (input.good()) {
input.read(reinterpret_cast<char*>(buffer.data()), gsl::narrow<std::streamsize>(buffer.size()));
std::streamsize read = input.gcount();
if (read < 0) {
throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value");
}
if (read == 0) {
logger_->log_trace("Finished reading input {}", source);
break;
} else {
logger_->log_trace("Read input of {}", read);
}
uint8_t* begin = buffer.data();
uint8_t* end = begin + read;
while (true) {
auto start_time = std::chrono::steady_clock::now();
uint8_t* delimiterPos = std::find(begin, end, static_cast<uint8_t>(inputDelimiter));
const auto len = gsl::narrow<size_t>(delimiterPos - begin);
logger_->log_trace("Read input of {} length is {} is at end? {}", read, len, delimiterPos == end);
/*
* We do not want to process the rest of the buffer after the last delimiter if
* - we have reached EOF in the file (we would discard it anyway)
* - there is nothing to process (the last character in the buffer is a delimiter)
*/
if (delimiterPos == end && (input.eof() || len == 0)) {
break;
}
/* Create claim and stream if needed and append data */
if (claim == nullptr) {
start_time = std::chrono::steady_clock::now();
claim = content_session_->create();
}
if (stream == nullptr) {
stream = content_session_->write(claim);
}
if (stream == nullptr) {
logger_->log_error("Stream is null");
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for import");
}
if (stream->write(begin, len) != len) {
logger_->log_error("Error while writing");
stream->close();
throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile");
}
/* Create a FlowFile if we reached a delimiter */
if (delimiterPos == end) {
break;
}
flowFile = create();
flowFile->setSize(stream->size());
flowFile->setOffset(0);
flowFile->setResourceClaim(claim);
logger_->log_debug("Import offset {} length {} into content {}, FlowFile UUID {}",
flowFile->getOffset(), flowFile->getSize(), flowFile->getResourceClaim()->getContentFullPath(), flowFile->getUUIDStr());
stream->close();
if (metrics_) {
metrics_->bytesWritten() += stream->size();
}
std::string details = process_context_->getProcessor().getName() + " modify flow record content " + flowFile->getUUIDStr();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time);
provenance_report_->modifyContent(*flowFile, details, duration);
flows.push_back(flowFile);
/* Reset these to start processing the next FlowFile with a clean slate */
flowFile.reset();
stream.reset();
claim.reset();
/* Skip delimiter */
begin = delimiterPos + 1;
}
}
} catch (const std::exception& exception) {
logger_->log_debug("Caught Exception during ProcessSession::import, type: {}, what: {}", typeid(exception).name(), exception.what());
throw;
} catch (...) {
logger_->log_debug("Caught Exception during ProcessSession::import, type: {}", getCurrentExceptionTypeName());
throw;
}
}
void ProcessSessionImpl::import(const std::string& source, std::vector<std::shared_ptr<FlowFile>> &flows, bool keepSource, uint64_t offset, char inputDelimiter) {
// this function calls a deprecated function, but it is itself deprecated, so suppress warnings
#if defined(__clang__)
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
#elif defined(__GNUC__)
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
#elif defined(WIN32)
#pragma warning(push)
#pragma warning(disable: 4996)
#endif
import(source, flows, offset, inputDelimiter);
#if defined(__clang__)
#pragma clang diagnostic pop
#elif defined(__GNUC__)
#pragma GCC diagnostic pop
#elif defined(WIN32)
#pragma warning(pop)
#endif
logger_->log_trace("Closed input {}, keeping source ? {}", source, keepSource);
if (!keepSource) {
(void)std::remove(source.c_str());
}
}
bool ProcessSessionImpl::exportContent(const std::string &destination, const std::string &tmpFile, const std::shared_ptr<core::FlowFile> &flow, bool /*keepContent*/) {
logger_->log_debug("Exporting content of {} to {}", flow->getUUIDStr(), destination);
ProcessSessionReadCallback cb(tmpFile, destination, logger_);
read(flow, std::ref(cb));
logger_->log_info("Committing {}", destination);
bool commit_ok = cb.commit();
if (commit_ok) {
logger_->log_info("Commit OK.");
} else {
logger_->log_error("Commit of {} to {} failed!", flow->getUUIDStr(), destination);
}
return commit_ok;
}
bool ProcessSessionImpl::exportContent(const std::string &destination, const std::shared_ptr<core::FlowFile> &flow, bool keepContent) {
utils::Identifier tmpFileUuid = id_generator_->generate();
std::stringstream tmpFileSs;
tmpFileSs << destination << "." << tmpFileUuid.to_string();
std::string tmpFileName = tmpFileSs.str();
return exportContent(destination, tmpFileName, flow, keepContent);
}
void ProcessSessionImpl::stash(const std::string &key, const std::shared_ptr<core::FlowFile> &flow) {
logger_->log_debug("Stashing content from {} to key {}", flow->getUUIDStr(), key);
auto claim = flow->getResourceClaim();
if (!claim) {
logger_->log_warn("Attempted to stash content of record {} when "
"there is no resource claim",
flow->getUUIDStr());
return;
}
// Stash the claim
flow->setStashClaim(key, claim);
// Clear current claim
flow->clearResourceClaim();
}
void ProcessSessionImpl::restore(const std::string &key, const std::shared_ptr<core::FlowFile> &flow) {
logger_->log_info("Restoring content to {} from key {}", flow->getUUIDStr(), key);
// Restore the claim
if (!flow->hasStashClaim(key)) {
logger_->log_warn("Requested restore to record {} from unknown key {}", flow->getUUIDStr(), key);
return;
}
// Disown current claim if existing
if (flow->getResourceClaim()) {
logger_->log_warn("Restoring stashed content of record {} from key {} when there is "
"existing content; existing content will be overwritten",
flow->getUUIDStr(), key);
}
// Restore the claim
auto stashClaim = flow->getStashClaim(key);
flow->setResourceClaim(stashClaim);
flow->clearStashClaim(key);
}
ProcessSessionImpl::RouteResult ProcessSessionImpl::routeFlowFile(const std::shared_ptr<FlowFile> &record, const std::function<void(const FlowFile&, const Relationship&)>& transfer_callback) {
if (record->isDeleted()) {
return RouteResult::Ok_Deleted;
}
utils::Identifier uuid = record->getUUID();
Relationship relationship;
if (auto it = updated_relationships_.find(uuid); it != updated_relationships_.end()) {
gsl_Expects(it->second);
relationship = *it->second;
} else if (auto new_it = added_flowfiles_.find(uuid); new_it != added_flowfiles_.end() && new_it->second.rel) {
relationship = *new_it->second.rel;
} else {
return RouteResult::Error_NoRelationship;
}
// Find the relationship, we need to find the connections for that relationship
const auto connections = process_context_->getProcessor().getOutGoingConnections(relationship.getName());
if (connections.empty()) {
// No connection
if (!process_context_->getProcessor().isAutoTerminated(relationship)) {
// Not autoterminate, we should have the connect
std::string message = "Connect empty for non auto terminated relationship " + relationship.getName();
throw Exception(PROCESS_SESSION_EXCEPTION, message);
} else {
// Autoterminated
remove(record);
transfer_callback(*record, relationship);
return RouteResult::Ok_AutoTerminated;
}
} else {
// We connections, clone the flow and assign the connection accordingly
for (auto itConnection = connections.begin(); itConnection != connections.end(); ++itConnection) {
auto connection = *itConnection;
if (itConnection == connections.begin()) {
// First connection which the flow need be routed to
record->setConnection(connection);
transfer_callback(*record, relationship);
} else {
// Clone the flow file and route to the connection
std::shared_ptr<core::FlowFile> cloneRecord = this->cloneDuringTransfer(*record);
if (cloneRecord) {
cloneRecord->setConnection(connection);
transfer_callback(*cloneRecord, relationship);
} else {
throw Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer " + record->getUUIDStr());
}
}
}
}
return RouteResult::Ok_Routed;
}
void ProcessSessionImpl::commit() {
const auto commit_start_time = std::chrono::steady_clock::now();
try {
std::unordered_map<std::string, TransferMetrics> transfers;
auto increaseTransferMetrics = [&](const FlowFile& record, const Relationship& relationship) {
++transfers[relationship.getName()].transfer_count;
transfers[relationship.getName()].transfer_size += record.getSize();
};
// First we clone the flow record based on the transferred relationship for updated flow record
for (auto && it : updated_flowfiles_) {
auto record = it.second.modified;
if (routeFlowFile(record, increaseTransferMetrics) == RouteResult::Error_NoRelationship) {
// Can not find relationship for the flow
throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the updated flow " + record->getUUIDStr());
}
}
// Do the same thing for added flow file
for (const auto& it : added_flowfiles_) {
auto record = it.second.flow_file;
if (routeFlowFile(record, increaseTransferMetrics) == RouteResult::Error_NoRelationship) {
// Can not find relationship for the flow
throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the added flow " + record->getUUIDStr());
}
}
std::map<Connectable*, std::vector<std::shared_ptr<FlowFile>>> connectionQueues;
Connectable* connection = nullptr;
// Complete process the added and update flow files for the session, send the flow file to its queue
for (const auto &it : updated_flowfiles_) {
auto record = it.second.modified;
logger_->log_trace("See {} in {}", record->getUUIDStr(), "updated_flowfiles_");
if (record->isDeleted()) {
continue;
}
connection = record->getConnection();
if ((connection) != nullptr) {
connectionQueues[connection].push_back(record);
}
}
for (const auto &it : added_flowfiles_) {
auto record = it.second.flow_file;
logger_->log_trace("See {} in {}", record->getUUIDStr(), "added_flowfiles_");
if (record->isDeleted()) {
continue;
}
connection = record->getConnection();
if ((connection) != nullptr) {
connectionQueues[connection].push_back(record);
}
}
// Process the clone flow files
for (const auto &record : cloned_flowfiles_) {
logger_->log_trace("See {} in {}", record->getUUIDStr(), "cloned_flowfiles_");
if (record->isDeleted()) {
continue;
}
connection = record->getConnection();
if ((connection) != nullptr) {
connectionQueues[connection].push_back(record);
}
}
for (const auto& record : deleted_flowfiles_) {
if (!record->isDeleted()) {
continue;
}
if (record->isStored() && process_context_->getFlowFileRepository()->Delete(record->getUUIDStr())) {
// mark for deletion in the flowFileRepository
record->setStoredToRepository(false);
}
}
ensureNonNullResourceClaim(connectionQueues);
content_session_->commit();
if (stateManager_ && !stateManager_->commit()) {
throw Exception(PROCESS_SESSION_EXCEPTION, "State manager commit failed.");
}
persistFlowFilesBeforeTransfer(connectionQueues, updated_flowfiles_);
for (auto& cq : connectionQueues) {
auto connection_from_queue = dynamic_cast<Connection*>(cq.first);
if (connection_from_queue) {
connection_from_queue->multiPut(cq.second);
} else {
for (auto& file : cq.second) {
cq.first->put(file);
}
}
}
if (metrics_) {
for (const auto& [relationship_name, transfer_metrics] : transfers) {
metrics_->transferredBytes() += transfer_metrics.transfer_size;
metrics_->transferredFlowFiles() += transfer_metrics.transfer_count;
metrics_->increaseRelationshipTransferCount(relationship_name, transfer_metrics.transfer_count);
}
}
// All done
updated_flowfiles_.clear();
added_flowfiles_.clear();
cloned_flowfiles_.clear();
deleted_flowfiles_.clear();
updated_relationships_.clear();
relationships_.clear();
// persistent the provenance report
this->provenance_report_->commit();
logger_->log_debug("ProcessSession committed for {}", process_context_->getProcessor().getName());
if (metrics_) {
auto time_delta = std::chrono::steady_clock::now() - commit_start_time;
metrics_->addLastSessionCommitRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(time_delta));
metrics_->processingNanos() += std::chrono::duration_cast<std::chrono::nanoseconds>(time_delta).count();
}
} catch (const std::exception& exception) {
logger_->log_debug("Caught Exception during process session commit, type: {}, what: {}", typeid(exception).name(), exception.what());
throw;
} catch (...) {
logger_->log_debug("Caught Exception during process session commit, type: {}", getCurrentExceptionTypeName());
throw;
}
}
void ProcessSessionImpl::rollback() {
// new FlowFiles are only persisted during commit
// no need to delete them here
std::map<Connectable*, std::vector<std::shared_ptr<FlowFile>>> connectionQueues;
try {
// Requeue the snapshot of the flowfile back
for (const auto &it : updated_flowfiles_) {
auto flowFile = it.second.modified;
// restore flowFile to original state
flowFile->copy(*it.second.snapshot);
penalize(flowFile);
logger_->log_debug("ProcessSession rollback for {}, record {}, to connection {}",
process_context_->getProcessor().getName(),
flowFile->getUUIDStr(),
flowFile->getConnection()->getName());
connectionQueues[flowFile->getConnection()].push_back(flowFile);
}
for (const auto& record : deleted_flowfiles_) {
record->setDeleted(false);
}
// put everything back where it came from
for (auto& cq : connectionQueues) {
auto connection = dynamic_cast<Connection*>(cq.first);
if (connection) {
connection->multiPut(cq.second);
} else {
for (auto& flow : cq.second) {
cq.first->put(flow);
}
}
}
content_session_->rollback();
if (stateManager_ && !stateManager_->rollback()) {
throw Exception(PROCESS_SESSION_EXCEPTION, "State manager rollback failed.");
}
cloned_flowfiles_.clear();
added_flowfiles_.clear();
updated_flowfiles_.clear();
deleted_flowfiles_.clear();
relationships_.clear();
logger_->log_warn("ProcessSession rollback for {} executed", process_context_->getProcessor().getName());
} catch (const std::exception& exception) {
logger_->log_warn("Caught Exception during process session rollback, type: {}, what: {}", typeid(exception).name(), exception.what());
throw;
} catch (...) {
logger_->log_warn("Caught Exception during process session rollback, type: {}", getCurrentExceptionTypeName());
throw;
}
}
nonstd::expected<void, std::string> ProcessSessionImpl::rollbackNoThrow() noexcept {
try {
rollback();
return {};
} catch(const std::exception& exception) {
return nonstd::make_unexpected(exception.what());
} catch(...) {
logger_->log_critical("Caught unknown exception during rollback, type: {}, terminating", getCurrentExceptionTypeName());
std::terminate();
}
}
void ProcessSessionImpl::persistFlowFilesBeforeTransfer(
std::map<Connectable*, std::vector<std::shared_ptr<core::FlowFile> > >& transactionMap,
const std::map<utils::Identifier, FlowFileUpdate>& modifiedFlowFiles) {
std::vector<std::pair<std::string, std::unique_ptr<io::BufferStream>>> flowData;
auto flowFileRepo = process_context_->getFlowFileRepository();
auto contentRepo = process_context_->getContentRepository();
enum class Type {
Dropped, Transferred
};
auto forEachFlowFile = [&] (Type type, auto fn) {
for (auto& [target, flows] : transactionMap) {
const auto connection = dynamic_cast<Connection*>(target);
const bool shouldDropEmptyFiles = connection && connection->getDropEmptyFlowFiles();
for (auto &ff : flows) {
auto snapshotIt = modifiedFlowFiles.find(ff->getUUID());
auto original = snapshotIt != modifiedFlowFiles.end() ? snapshotIt->second.snapshot : nullptr;
if (shouldDropEmptyFiles && ff->getSize() == 0) {
// the receiver will drop this FF
if (type == Type::Dropped) {
fn(ff, original);
}
} else {
if (type == Type::Transferred) {
fn(ff, original);
}
}
}
}
};
// collect serialized flowfiles
forEachFlowFile(Type::Transferred, [&] (auto& ff, auto& /*original*/) {
auto stream = std::make_unique<io::BufferStream>();
std::dynamic_pointer_cast<FlowFileRecord>(ff)->Serialize(*stream);
flowData.emplace_back(ff->getUUIDStr(), std::move(stream));
});
// increment on behalf of the to be persisted instance
forEachFlowFile(Type::Transferred, [&] (auto& ff, auto& /*original*/) {
if (auto claim = ff->getResourceClaim())
claim->increaseFlowFileRecordOwnedCount();
});
if (!flowFileRepo->MultiPut(flowData)) {
logger_->log_error("Failed execute multiput on FF repo!");
// decrement on behalf of the failed persisted instance
forEachFlowFile(Type::Transferred, [&] (auto& ff, auto& /*original*/) {
if (auto claim = ff->getResourceClaim())
claim->decreaseFlowFileRecordOwnedCount();
});
throw Exception(PROCESS_SESSION_EXCEPTION, "Failed to put flowfiles to repository");
}
// decrement on behalf of the overridden instance if any
forEachFlowFile(Type::Transferred, [&] (auto& ff, auto& original) {
if (auto original_claim = original ? original->getResourceClaim() : nullptr) {
original_claim->decreaseFlowFileRecordOwnedCount();
}
ff->setStoredToRepository(true);
});
forEachFlowFile(Type::Dropped, [&] (auto& ff, auto& original) {
// the receiver promised to drop this FF, no need for it anymore
if (ff->isStored() && flowFileRepo->Delete(ff->getUUIDStr())) {
// original must be non-null since this flowFile is already stored in the repos ->
// must have come from a session->get()
gsl_Assert(original);
ff->setStoredToRepository(false);
}
});
}
void ProcessSessionImpl::ensureNonNullResourceClaim(
const std::map<Connectable*, std::vector<std::shared_ptr<core::FlowFile>>> &transactionMap) {
for (auto& transaction : transactionMap) {
for (auto& flowFile : transaction.second) {
auto claim = flowFile->getResourceClaim();
if (!claim) {
logger_->log_debug("Processor {} ({}) did not create a ResourceClaim, creating an empty one",
process_context_->getProcessor().getUUIDStr(),
process_context_->getProcessor().getName());
io::BufferStream emptyBufferStream;
write(flowFile, OutputStreamPipe{emptyBufferStream});
}
}
}
}
std::shared_ptr<core::FlowFile> ProcessSessionImpl::get() {
const auto first = process_context_->getProcessor().pickIncomingConnection();
if (first == nullptr) {
logger_->log_trace("Get is null for {}", process_context_->getProcessor().getName());
return nullptr;
}
auto current = dynamic_cast<Connection*>(first);
if (!current) {
logger_->log_error("The incoming connection [{}] of the processor [{}] \"{}\" is not actually a Connection.",
first->getUUIDStr(), process_context_->getProcessor().getUUIDStr(), process_context_->getProcessor().getName());
return {};
}
do {
std::set<std::shared_ptr<core::FlowFile> > expired;
std::shared_ptr<core::FlowFile> ret = current->poll(expired);
if (!expired.empty()) {
// Remove expired flow record
for (const auto& record : expired) {
std::stringstream details;
details << process_context_->getProcessor().getName() << " expire flow record " << record->getUUIDStr();
provenance_report_->expire(*record, details.str());
// there is no rolling back expired FlowFiles
if (record->isStored() && process_context_->getFlowFileRepository()->Delete(record->getUUIDStr())) {
record->setStoredToRepository(false);
}
}
}
if (ret) {
// add the flow record to the current process session update map
ret->setDeleted(false);
std::shared_ptr<FlowFile> snapshot = std::make_shared<FlowFileRecordImpl>();
snapshot->copy(*ret);
logger_->log_debug("Create Snapshot FlowFile with UUID {}", snapshot->getUUIDStr());
utils::Identifier uuid = ret->getUUID();
updated_flowfiles_[uuid] = {ret, snapshot};
auto flow_version = process_context_->getProcessor().getFlowIdentifier();
if (flow_version != nullptr) {
ret->setAttribute(SpecialFlowAttribute::FLOW_ID, flow_version->getFlowId());
}
if (metrics_) {
metrics_->incomingBytes() += ret->getSize();
++metrics_->incomingFlowFiles();
}
return ret;
}
current = dynamic_cast<Connection*>(process_context_->getProcessor().pickIncomingConnection());
} while (current != nullptr && current != first);
return nullptr;
}
void ProcessSessionImpl::flushContent() {
content_session_->commit();
}
bool ProcessSessionImpl::outgoingConnectionsFull(const std::string& relationship) {
std::set<Connectable*> connections = process_context_->getProcessor().getOutGoingConnections(relationship);
Connection * connection = nullptr;
for (const auto conn : connections) {
connection = dynamic_cast<Connection*>(conn);
if (connection && connection->backpressureThresholdReached()) {
return true;
}
}
return false;
}
bool ProcessSessionImpl::existsFlowFileInRelationship(const Relationship &relationship) {
return std::any_of(updated_relationships_.begin(), updated_relationships_.end(),
[&](const auto& key_value_pair) {
return key_value_pair.second && relationship == *key_value_pair.second;
}) || std::any_of(added_flowfiles_.begin(), added_flowfiles_.end(),
[&](const auto& key_value_pair) {
return key_value_pair.second.rel && relationship == *key_value_pair.second.rel;
});
}
bool ProcessSessionImpl::hasBeenTransferred(const core::FlowFile &flow) const {
return (updated_relationships_.contains(flow.getUUID()) && updated_relationships_.at(flow.getUUID()) != nullptr) ||
(added_flowfiles_.contains(flow.getUUID()) && added_flowfiles_.at(flow.getUUID()).rel != nullptr);
}
} // namespace org::apache::nifi::minifi::core