| /** |
| * @file GenerateFlowFile.cpp |
| * GenerateFlowFile class implementation |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #include "processors/GenerateFlowFile.h" |
| #include <sys/time.h> |
| #include <time.h> |
| #include <vector> |
| #include <queue> |
| #include <map> |
| #include <memory> |
| #include <string> |
| #include <set> |
| #include <chrono> |
| #include <thread> |
| #include <random> |
| #include "utils/StringUtils.h" |
| #include "core/ProcessContext.h" |
| #include "core/ProcessSession.h" |
| |
| namespace org { |
| namespace apache { |
| namespace nifi { |
| namespace minifi { |
| namespace processors { |
| const char *GenerateFlowFile::DATA_FORMAT_BINARY = "Binary"; |
| const char *GenerateFlowFile::DATA_FORMAT_TEXT = "Text"; |
| core::Property GenerateFlowFile::FileSize("File Size", "The size of the file that will be used", "1 kB"); |
| core::Property GenerateFlowFile::BatchSize("Batch Size", "The number of FlowFiles to be transferred in each invocation", "1"); |
| core::Property GenerateFlowFile::DataFormat("Data Format", "Specifies whether the data should be Text or Binary", GenerateFlowFile::DATA_FORMAT_BINARY); |
| core::Property GenerateFlowFile::UniqueFlowFiles("Unique FlowFiles", "If true, each FlowFile that is generated will be unique. If false, a random value will be generated and all FlowFiles", "true"); |
| core::Relationship GenerateFlowFile::Success("success", "success operational on the flow record"); |
| const unsigned int TEXT_LEN = 90; |
| static const char TEXT_CHARS[TEXT_LEN+1] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890!@#$%^&*()-_=+/?.,';:\"?<>\n\t "; |
| |
| void GenerateFlowFile::initialize() { |
| // Set the supported properties |
| std::set<core::Property> properties; |
| properties.insert(FileSize); |
| properties.insert(BatchSize); |
| properties.insert(DataFormat); |
| properties.insert(UniqueFlowFiles); |
| setSupportedProperties(properties); |
| // Set the supported relationships |
| std::set<core::Relationship> relationships; |
| relationships.insert(Success); |
| setSupportedRelationships(relationships); |
| } |
| |
| void GenerateFlowFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { |
| int64_t batchSize = 1; |
| bool uniqueFlowFile = true; |
| int64_t fileSize = 1024; |
| bool textData = false; |
| |
| std::string value; |
| if (context->getProperty(FileSize.getName(), value)) { |
| core::Property::StringToInt(value, fileSize); |
| } |
| if (context->getProperty(BatchSize.getName(), value)) { |
| core::Property::StringToInt(value, batchSize); |
| } |
| if (context->getProperty(DataFormat.getName(), value)) { |
| textData = (value == GenerateFlowFile::DATA_FORMAT_TEXT); |
| } |
| if (context->getProperty(UniqueFlowFiles.getName(), value)) { |
| org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, uniqueFlowFile); |
| } |
| |
| if (uniqueFlowFile) { |
| char *data; |
| data = new char[fileSize]; |
| if (!data) |
| return; |
| uint64_t dataSize = fileSize; |
| GenerateFlowFile::WriteCallback callback(data, dataSize); |
| char *current = data; |
| if (textData) { |
| for (int i = 0; i < fileSize; i++) { |
| int randValue = random(); |
| data[i] = TEXT_CHARS[randValue % TEXT_LEN]; |
| } |
| } else { |
| for (int i = 0; i < fileSize; i += sizeof(int)) { |
| int randValue = random(); |
| *(reinterpret_cast<int*>(current)) = randValue; |
| current += sizeof(int); |
| } |
| } |
| for (int i = 0; i < batchSize; i++) { |
| // For each batch |
| std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); |
| if (!flowFile) |
| return; |
| if (fileSize > 0) |
| session->write(flowFile, &callback); |
| session->transfer(flowFile, Success); |
| } |
| delete[] data; |
| } else { |
| if (!_data) { |
| // We have not created the data yet |
| _data = new char[fileSize]; |
| _dataSize = fileSize; |
| char *current = _data; |
| if (textData) { |
| for (int i = 0; i < fileSize; i++) { |
| int randValue = random(); |
| _data[i] = TEXT_CHARS[randValue % TEXT_LEN]; |
| } |
| } else { |
| for (int i = 0; i < fileSize; i += sizeof(int)) { |
| int randValue = random(); |
| *(reinterpret_cast<int*>(current)) = randValue; |
| current += sizeof(int); |
| } |
| } |
| } |
| GenerateFlowFile::WriteCallback callback(_data, _dataSize); |
| for (int i = 0; i < batchSize; i++) { |
| // For each batch |
| std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); |
| if (!flowFile) |
| return; |
| if (fileSize > 0) |
| session->write(flowFile, &callback); |
| session->transfer(flowFile, Success); |
| } |
| } |
| } |
| } /* namespace processors */ |
| } /* namespace minifi */ |
| } /* namespace nifi */ |
| } /* namespace apache */ |
| } /* namespace org */ |