blob: eb94b693f618c9f4640396d9f975852f1e5e6429 [file] [log] [blame]
/**
* @file UnfocusArchiveEntry.cpp
* UnfocusArchiveEntry class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "UnfocusArchiveEntry.h"
#include <iostream>
#include <fstream>
#include <memory>
#include <string>
#include <system_error>
#include "SmartArchivePtrs.h"
#include "minifi-cpp/core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/Resource.h"
#include "utils/ConfigurationUtils.h"
#include "minifi-cpp/utils/gsl.h"
namespace {
inline constexpr auto BUFFER_SIZE = org::apache::nifi::minifi::utils::configuration::DEFAULT_BUFFER_SIZE;
} // namespace
namespace org::apache::nifi::minifi::processors {
void UnfocusArchiveEntry::initialize() {
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
}
void UnfocusArchiveEntry::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
auto flowFile = session.get();
if (!flowFile) {
return;
}
utils::file::FileManager file_man;
ArchiveMetadata lensArchiveMetadata;
// Get lens stack from attribute
{
ArchiveStack archiveStack;
{
std::string existingLensStack;
if (flowFile->getAttribute("lens.archive.stack", existingLensStack)) {
logger_->log_info("FocusArchiveEntry loading existing lens context");
try {
archiveStack.loadJsonString(existingLensStack);
} catch (Exception &exception) {
logger_->log_debug("{}", exception.what());
context.yield();
return;
}
} else {
logger_->log_error("UnfocusArchiveEntry lens metadata not found");
context.yield();
return;
}
}
lensArchiveMetadata = archiveStack.pop();
lensArchiveMetadata.seedTempPaths(&file_man, false);
{
std::string stackStr = archiveStack.toJsonString();
flowFile->setAttribute("lens.archive.stack", stackStr);
}
}
// Export focused entry to tmp file
for (const auto &entry : lensArchiveMetadata.entryMetadata) {
if (entry.entryType != AE_IFREG || entry.entrySize == 0) {
continue;
}
if (entry.entryName == lensArchiveMetadata.focusedEntry) {
logger_->log_debug("UnfocusArchiveEntry exporting focused entry to {}", entry.tmpFileName);
session.exportContent(entry.tmpFileName.string(), flowFile, false);
}
}
// Restore/export entries from stash, one-by-one, to tmp files
for (const auto &entry : lensArchiveMetadata.entryMetadata) {
if (entry.entryType != AE_IFREG || entry.entrySize == 0) {
continue;
}
if (entry.entryName == lensArchiveMetadata.focusedEntry) {
continue;
}
logger_->log_debug("UnfocusArchiveEntry exporting entry {} to {}", entry.stashKey, entry.tmpFileName);
session.restore(entry.stashKey, flowFile);
// TODO(calebj) implement copy export/don't worry about multiple claims/optimal efficiency for *now*
session.exportContent(entry.tmpFileName.string(), flowFile, false);
}
if (lensArchiveMetadata.archiveName.empty()) {
flowFile->removeAttribute("filename");
flowFile->removeAttribute("path");
flowFile->removeAttribute("absolute.path");
} else {
std::string abs_path = lensArchiveMetadata.archiveName;
std::size_t found = abs_path.find_last_of("/\\");
std::string path = abs_path.substr(0, found);
std::string name = abs_path.substr(found + 1);
flowFile->setAttribute("filename", name);
flowFile->setAttribute("path", path);
flowFile->setAttribute("absolute.path", abs_path);
}
// Create archive by restoring each entry in the archive from tmp files
WriteCallback cb(&lensArchiveMetadata);
session.write(flowFile, std::cref(cb));
// Transfer to the relationship
session.transfer(flowFile, Success);
}
UnfocusArchiveEntry::WriteCallback::WriteCallback(ArchiveMetadata *archiveMetadata)
: _archiveMetadata(archiveMetadata) {
}
struct UnfocusArchiveEntryWriteData {
std::shared_ptr<io::OutputStream> stream;
};
la_ssize_t UnfocusArchiveEntry::WriteCallback::write_cb(struct archive *, void *d, const void *buffer, size_t length) {
auto* const data = static_cast<UnfocusArchiveEntryWriteData *>(d);
const auto write_ret = data->stream->write(static_cast<const uint8_t*>(buffer), length);
return io::isError(write_ret) ? -1 : gsl::narrow<la_ssize_t>(write_ret);
}
int64_t UnfocusArchiveEntry::WriteCallback::operator()(const std::shared_ptr<io::OutputStream>& stream) const {
UnfocusArchiveEntryWriteData data;
data.stream = stream;
auto output_archive = archive_write_unique_ptr{archive_write_new()};
int64_t nlen = 0;
archive_write_set_format(output_archive.get(), _archiveMetadata->archiveFormat);
archive_write_open(output_archive.get(), &data, nullptr, write_cb, nullptr); // data must outlive the archive because it writes during free
// Iterate entries & write from tmp file to archive
std::array<char, BUFFER_SIZE> buf{};
struct stat st{};
auto entry = archive_entry_unique_ptr{archive_entry_new()};
for (const auto &entryMetadata : _archiveMetadata->entryMetadata) {
logger_->log_info("UnfocusArchiveEntry writing entry {}", entryMetadata.entryName);
if (entryMetadata.entryType == AE_IFREG && entryMetadata.entrySize > 0) {
size_t stat_ok = stat(entryMetadata.tmpFileName.string().c_str(), &st);
if (stat_ok != 0) {
logger_->log_error("Error statting {}: {}", entryMetadata.tmpFileName, std::system_category().default_error_condition(errno).message());
}
archive_entry_copy_stat(entry.get(), &st);
}
archive_entry_set_filetype(entry.get(), entryMetadata.entryType);
archive_entry_set_pathname(entry.get(), entryMetadata.entryName.c_str());
archive_entry_set_perm(entry.get(), entryMetadata.entryPerm);
archive_entry_set_size(entry.get(), gsl::narrow<la_int64_t>(entryMetadata.entrySize));
archive_entry_set_uid(entry.get(), entryMetadata.entryUID);
archive_entry_set_gid(entry.get(), entryMetadata.entryGID);
archive_entry_set_mtime(entry.get(), entryMetadata.entryMTime, gsl::narrow<long>(entryMetadata.entryMTimeNsec)); // NOLINT long comes from libarchive API
logger_->log_info("Writing {} with type {}, perms {}, size {}, uid {}, gid {}, mtime {},{}", entryMetadata.entryName, entryMetadata.entryType, entryMetadata.entryPerm,
entryMetadata.entrySize, entryMetadata.entryUID, entryMetadata.entryGID, entryMetadata.entryMTime, entryMetadata.entryMTimeNsec);
archive_write_header(output_archive.get(), entry.get());
// If entry is regular file, copy entry contents
if (entryMetadata.entryType == AE_IFREG && entryMetadata.entrySize > 0) {
logger_->log_info("UnfocusArchiveEntry writing {} bytes of "
"data from tmp file {} to archive entry {}",
st.st_size, entryMetadata.tmpFileName.string(), entryMetadata.entryName);
std::ifstream ifs(entryMetadata.tmpFileName, std::ifstream::in | std::ios::binary);
while (ifs.good()) {
ifs.read(buf.data(), buf.size());
auto len = gsl::narrow<size_t>(ifs.gcount());
int64_t written = archive_write_data(output_archive.get(), buf.data(), len);
if (written < 0) {
logger_->log_error("UnfocusArchiveEntry failed to write data to "
"archive entry %s due to error: %s",
entryMetadata.entryName, archive_error_string(output_archive.get()));
} else {
nlen += written;
}
}
ifs.close();
// Remove the tmp file as we are through with it
std::filesystem::remove(entryMetadata.tmpFileName);
}
archive_entry_clear(entry.get());
}
return nlen;
}
REGISTER_RESOURCE(UnfocusArchiveEntry, Processor);
} // namespace org::apache::nifi::minifi::processors