blob: 7ff2ee3aca6b3f93ffff9044343bafd3f721f05c [file] [log] [blame]
/**
* @file GenerateFlowFile.cpp
* GenerateFlowFile class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "GenerateFlowFile.h"
#include <limits>
#include <memory>
#include <random>
#include <string>
#include <vector>
#include "minifi-cpp/core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/Resource.h"
#include "utils/OptionalUtils.h"
#include "utils/gsl.h"
#include "utils/ProcessorConfigUtils.h"
#include "minifi-cpp/core/ProcessorDescriptor.h"
namespace org::apache::nifi::minifi::processors {
const char *GenerateFlowFile::DATA_FORMAT_TEXT = "Text";
constexpr const char * TEXT_CHARS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890!@#$%^&*()-_=+/?.,';:\"?<>\n\t ";
void GenerateFlowFile::initialize() {
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
}
void generateData(std::vector<char>& data, const bool text_data = false) {
std::random_device rd;
std::mt19937 eng(rd());
if (text_data) {
const int index_of_last_char = gsl::narrow<int>(strlen(TEXT_CHARS)) - 1;
std::uniform_int_distribution<> distr(0, index_of_last_char);
std::generate_n(data.begin(), data.size(), [&] { return TEXT_CHARS[static_cast<uint8_t>(distr(eng))]; });
} else {
std::uniform_int_distribution<> distr(std::numeric_limits<char>::min(), std::numeric_limits<char>::max());
auto rand = [&distr, &eng] { return distr(eng); };
std::generate_n(data.begin(), data.size(), rand);
}
}
GenerateFlowFile::Mode GenerateFlowFile::getMode(bool is_unique, bool is_text, bool has_custom_text, uint64_t file_size) {
if (is_text && !is_unique && has_custom_text)
return Mode::CustomText;
if (file_size == 0)
return Mode::Empty;
if (is_unique) {
if (is_text)
return Mode::UniqueText;
else
return Mode::UniqueByte;
} else {
if (is_text)
return Mode::NotUniqueText;
else
return Mode::NotUniqueByte;
}
}
void GenerateFlowFile::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
bool is_text = context.getProperty(DataFormat)
| utils::transform([](const std::string& data_format) { return data_format == DATA_FORMAT_TEXT;})
| utils::valueOrElse([]() {return false;});
bool is_unique = utils::parseOptionalBoolProperty(context, UniqueFlowFiles).value_or(true);
const bool has_custom_text = context.hasNonEmptyProperty(CustomText.name);
file_size_ = utils::parseDataSizeProperty(context, FileSize);
batch_size_ = utils::parseU64Property(context, BatchSize);
mode_ = getMode(is_unique, is_text, has_custom_text, file_size_);
if (!isUnique(mode_)) {
non_unique_data_.resize(gsl::narrow<size_t>(file_size_));
generateData(non_unique_data_, isText(mode_));
}
logger_->log_trace("GenerateFlowFile is configured in {} mode", magic_enum::enum_name(mode_));
if (mode_ != Mode::CustomText && has_custom_text)
logger_->log_warn("Custom Text property is set but not used. "
"For Custom Text to be used, Data Format needs to be Text, and Unique FlowFiles needs to be false.");
}
// The custom text has to be reevaluated once per batch
void GenerateFlowFile::refreshNonUniqueData(core::ProcessContext& context) {
if (mode_ != Mode::CustomText)
return;
std::string custom_text = context.getProperty(CustomText).value_or("");
non_unique_data_.assign(custom_text.begin(), custom_text.end());
}
void GenerateFlowFile::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
refreshNonUniqueData(context);
for (uint64_t i = 0; i < batch_size_; i++) {
std::shared_ptr<core::FlowFile> flow_file = session.create();
if (!flow_file) {
logger_->log_error("Failed to create flowfile!");
return;
}
if (mode_ == Mode::Empty) {
// the newly created flowfile is empty by default
} else if (isUnique(mode_)) {
std::vector<char> unique_data(gsl::narrow<size_t>(file_size_));
generateData(unique_data, isText(mode_));
session.writeBuffer(flow_file, unique_data);
} else {
session.writeBuffer(flow_file, non_unique_data_);
}
session.transfer(flow_file, Success);
}
}
REGISTER_RESOURCE(GenerateFlowFile, Processor);
} // namespace org::apache::nifi::minifi::processors