blob: bf386898efe97156cb0bd018418d1020586f2a72 [file] [log] [blame]
/**
* @file GenerateFlowFile.cpp
* GenerateFlowFile class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "GenerateFlowFile.h"
#include <time.h>
#include <chrono>
#include <limits>
#include <map>
#include <memory>
#include <queue>
#include <random>
#include <set>
#include <string>
#include <thread>
#include <utility>
#include <vector>
#include "utils/gsl.h"
#include "utils/StringUtils.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/PropertyValidation.h"
#include "core/TypedValues.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace processors {
const char *GenerateFlowFile::DATA_FORMAT_TEXT = "Text";
core::Property GenerateFlowFile::FileSize(
core::PropertyBuilder::createProperty("File Size")->withDescription("The size of the file that will be used")->isRequired(false)->withDefaultValue<core::DataSizeValue>("1 kB")->build());
core::Property GenerateFlowFile::BatchSize(
core::PropertyBuilder::createProperty("Batch Size")->withDescription("The number of FlowFiles to be transferred in each invocation")->isRequired(false)->withDefaultValue<int>(1)->build());
core::Property GenerateFlowFile::DataFormat(
core::PropertyBuilder::createProperty("Data Format")->withDescription("Specifies whether the data should be Text or Binary")->isRequired(false)->withAllowableValue<std::string>("Text")
->withAllowableValue("Binary")->withDefaultValue("Binary")->build());
core::Property GenerateFlowFile::UniqueFlowFiles(
core::PropertyBuilder::createProperty("Unique FlowFiles")->withDescription("If true, each FlowFile that is generated will be unique. If false, a random value will be generated and all FlowFiles")
->isRequired(false)->withDefaultValue<bool>(true)->build());
core::Relationship GenerateFlowFile::Success("success", "success operational on the flow record");
constexpr const char * TEXT_CHARS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890!@#$%^&*()-_=+/?.,';:\"?<>\n\t ";
void GenerateFlowFile::initialize() {
// Set the supported properties
std::set<core::Property> properties;
properties.insert(FileSize);
properties.insert(BatchSize);
properties.insert(DataFormat);
properties.insert(UniqueFlowFiles);
setSupportedProperties(properties);
// Set the supported relationships
std::set<core::Relationship> relationships;
relationships.insert(Success);
setSupportedRelationships(relationships);
}
void generateData(std::vector<char>& data, bool textData = false) {
std::random_device rd;
std::mt19937 eng(rd());
if (textData) {
const int index_of_last_char = gsl::narrow<int>(strlen(TEXT_CHARS)) - 1;
std::uniform_int_distribution<> distr(0, index_of_last_char);
auto rand = std::bind(distr, eng);
std::generate_n(data.begin(), data.size(), rand);
std::for_each(data.begin(), data.end(), [](char & c) { c = TEXT_CHARS[c];});
} else {
std::uniform_int_distribution<> distr(std::numeric_limits<char>::min(), std::numeric_limits<char>::max());
auto rand = std::bind(distr, eng);
std::generate_n(data.begin(), data.size(), rand);
}
}
void GenerateFlowFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) {
if (context->getProperty(FileSize.getName(), fileSize_)) {
logger_->log_trace("File size is configured to be %d", fileSize_);
}
if (context->getProperty(BatchSize.getName(), batchSize_)) {
logger_->log_trace("Batch size is configured to be %d", batchSize_);
}
std::string value;
if (context->getProperty(DataFormat.getName(), value)) {
textData_ = (value == GenerateFlowFile::DATA_FORMAT_TEXT);
}
if (context->getProperty(UniqueFlowFiles.getName(), uniqueFlowFile_)) {
logger_->log_trace("Unique Flow files is configured to be %i", uniqueFlowFile_);
}
if (!uniqueFlowFile_) {
data_.resize(gsl::narrow<size_t>(fileSize_));
generateData(data_, textData_);
}
}
void GenerateFlowFile::onTrigger(core::ProcessContext* /*context*/, core::ProcessSession *session) {
for (uint64_t i = 0; i < batchSize_; i++) {
// For each batch
std::shared_ptr<core::FlowFile> flowFile = session->create();
if (!flowFile) {
logger_->log_error("Failed to create flowfile!");
return;
}
if (uniqueFlowFile_) {
std::vector<char> data(gsl::narrow<size_t>(fileSize_));
if (fileSize_ > 0) {
generateData(data, textData_);
}
GenerateFlowFile::WriteCallback callback(std::move(data));
session->write(flowFile, &callback);
} else {
GenerateFlowFile::WriteCallback callback(data_);
session->write(flowFile, &callback);
}
session->transfer(flowFile, Success);
}
}
} // namespace processors
} // namespace minifi
} // namespace nifi
} // namespace apache
} // namespace org