blob: 5602f4ca204db2b21c8ccffd26c10ad56b022a14 [file] [log] [blame]
/**
* @file FocusArchiveEntry.cpp
* FocusArchiveEntry class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "FocusArchiveEntry.h"
#include <archive.h>
#include <archive_entry.h>
#include <string.h>
#include <string>
#include <set>
#include <iostream>
#include <fstream>
#include <memory>
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "Exception.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace processors {
std::shared_ptr<utils::IdGenerator> FocusArchiveEntry::id_generator_ = utils::IdGenerator::getIdGenerator();
core::Property FocusArchiveEntry::Path("Path", "The path within the archive to focus (\"/\" to focus the total archive)", "");
core::Relationship FocusArchiveEntry::Success("success", "success operational on the flow record");
bool FocusArchiveEntry::set_or_update_attr(std::shared_ptr<core::FlowFile> flowFile, const std::string& key, const std::string& value) const {
if (flowFile->updateAttribute(key, value))
return true;
else
return flowFile->addAttribute(key, value);
}
void FocusArchiveEntry::initialize() {
//! Set the supported properties
std::set<core::Property> properties;
properties.insert(Path);
setSupportedProperties(properties);
//! Set the supported relationships
std::set<core::Relationship> relationships;
relationships.insert(Success);
setSupportedRelationships(relationships);
}
void FocusArchiveEntry::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
auto flowFile = session->get();
std::shared_ptr<FlowFileRecord> flowFileRecord = std::static_pointer_cast<FlowFileRecord>(flowFile);
if (!flowFile) {
return;
}
fileutils::FileManager file_man;
// Extract archive contents
ArchiveMetadata archiveMetadata;
context->getProperty(Path.getName(), archiveMetadata.focusedEntry);
flowFile->getAttribute("filename", archiveMetadata.archiveName);
ReadCallback cb(this, &file_man, &archiveMetadata);
session->read(flowFile, &cb);
// For each extracted entry, import & stash to key
std::string targetEntryStashKey;
std::string targetEntry;
for (auto &entryMetadata : archiveMetadata.entryMetadata) {
if (entryMetadata.entryType == AE_IFREG) {
logger_->log_info("FocusArchiveEntry importing %s from %s", entryMetadata.entryName, entryMetadata.tmpFileName);
session->import(entryMetadata.tmpFileName, flowFile, false, 0);
utils::Identifier stashKeyUuid;
id_generator_->generate(stashKeyUuid);
logger_->log_debug("FocusArchiveEntry generated stash key %s for entry %s", stashKeyUuid.to_string(), entryMetadata.entryName);
entryMetadata.stashKey.assign(stashKeyUuid.to_string());
if (entryMetadata.entryName == archiveMetadata.focusedEntry) {
targetEntryStashKey = entryMetadata.stashKey;
}
// Stash the content
session->stash(entryMetadata.stashKey, flowFile);
}
}
// Restore target archive entry
if (targetEntryStashKey != "") {
session->restore(targetEntryStashKey, flowFile);
} else {
logger_->log_warn("FocusArchiveEntry failed to locate target entry: %s",
archiveMetadata.focusedEntry.c_str());
}
// Set new/updated lens stack to attribute
{
ArchiveStack archiveStack;
std::string existingLensStack;
if (flowFile->getAttribute("lens.archive.stack", existingLensStack)) {
logger_->log_info("FocusArchiveEntry loading existing lens context");
try {
archiveStack.loadJsonString(existingLensStack);
} catch (Exception &exception) {
logger_->log_debug(exception.what());
context->yield();
return;
}
}
archiveStack.push(archiveMetadata);
//logger_->log_debug(archiveMetadata.toJsonString());
std::string stackStr = archiveStack.toJsonString();
if (!flowFile->updateAttribute("lens.archive.stack", stackStr)) {
flowFile->addAttribute("lens.archive.stack", stackStr);
}
}
// Update filename attribute to that of focused entry
std::size_t found = archiveMetadata.focusedEntry.find_last_of("/\\");
std::string path = archiveMetadata.focusedEntry.substr(0, found);
std::string name = archiveMetadata.focusedEntry.substr(found + 1);
set_or_update_attr(flowFile, "filename", name);
set_or_update_attr(flowFile, "path", path);
set_or_update_attr(flowFile, "absolute.path", archiveMetadata.focusedEntry);
// Transfer to the relationship
session->transfer(flowFile, Success);
}
typedef struct {
std::shared_ptr<io::BaseStream> stream;
core::Processor *processor;
char buf[8196];
} FocusArchiveEntryReadData;
// Read callback which reads from the flowfile stream
ssize_t FocusArchiveEntry::ReadCallback::read_cb(struct archive * a, void *d, const void **buf) {
auto data = static_cast<FocusArchiveEntryReadData *>(d);
*buf = data->buf;
int64_t read = 0;
int64_t last_read = 0;
do {
last_read = data->stream->readData(reinterpret_cast<uint8_t *>(data->buf), 8196 - read);
read += last_read;
} while (data->processor->isRunning() && last_read > 0 && read < 8196);
if (!data->processor->isRunning()) {
archive_set_error(a, EINTR, "Processor shut down during read");
return -1;
}
return read;
}
int64_t FocusArchiveEntry::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
auto inputArchive = archive_read_new();
struct archive_entry *entry;
int64_t nlen = 0;
FocusArchiveEntryReadData data;
data.stream = stream;
data.processor = proc_;
archive_read_support_format_all(inputArchive);
archive_read_support_filter_all(inputArchive);
// Read each item in the archive
int res;
if ((res = archive_read_open(inputArchive, &data, ok_cb, read_cb, ok_cb))) {
logger_->log_error("FocusArchiveEntry can't open due to archive error: %s", archive_error_string(inputArchive));
return nlen;
}
while (isRunning()) {
res = archive_read_next_header(inputArchive, &entry);
if (res == ARCHIVE_EOF) {
break;
}
if (res < ARCHIVE_OK) {
logger_->log_error("FocusArchiveEntry can't read header due to archive error: %s", archive_error_string(inputArchive));
return nlen;
}
if (res < ARCHIVE_WARN) {
logger_->log_warn("FocusArchiveEntry got archive warning while reading header: %s", archive_error_string(inputArchive));
return nlen;
}
auto entryName = archive_entry_pathname(entry);
(*_archiveMetadata).archiveFormatName.assign(archive_format_name(inputArchive));
(*_archiveMetadata).archiveFormat = archive_format(inputArchive);
// Record entry metadata
auto entryType = archive_entry_filetype(entry);
ArchiveEntryMetadata metadata;
metadata.entryName = entryName;
metadata.entryType = entryType;
metadata.entryPerm = archive_entry_perm(entry);
metadata.entrySize = archive_entry_size(entry);
metadata.entryUID = archive_entry_uid(entry);
metadata.entryGID = archive_entry_gid(entry);
metadata.entryMTime = archive_entry_mtime(entry);
metadata.entryMTimeNsec = archive_entry_mtime_nsec(entry);
logger_->log_info("FocusArchiveEntry entry type of %s is: %d", entryName, metadata.entryType);
logger_->log_info("FocusArchiveEntry entry perm of %s is: %d", entryName, metadata.entryPerm);
// Write content to tmp file
if (entryType == AE_IFREG) {
auto tmpFileName = file_man_->unique_file(true);
metadata.tmpFileName = tmpFileName;
metadata.entryType = entryType;
logger_->log_info("FocusArchiveEntry extracting %s to: %s", entryName, tmpFileName);
auto fd = fopen(tmpFileName.c_str(), "w");
if (archive_entry_size(entry) > 0) {
nlen += archive_read_data_into_fd(inputArchive, fileno(fd));
}
fclose(fd);
}
(*_archiveMetadata).entryMetadata.push_back(metadata);
}
archive_read_close(inputArchive);
archive_read_free(inputArchive);
return nlen;
}
FocusArchiveEntry::ReadCallback::ReadCallback(core::Processor *processor, fileutils::FileManager *file_man, ArchiveMetadata *archiveMetadata)
: file_man_(file_man),
proc_(processor) {
logger_ = logging::LoggerFactory<FocusArchiveEntry>::getLogger();
_archiveMetadata = archiveMetadata;
}
FocusArchiveEntry::ReadCallback::~ReadCallback() {
}
} /* namespace processors */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */