blob: 4b0603de17488bef85254a92e28baa1e61b67e5d [file] [log] [blame]
/**
* @file GenerateFlowFile.cpp
* GenerateFlowFile class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <vector>
#include <queue>
#include <map>
#include <set>
#include <sys/time.h>
#include <time.h>
#include <chrono>
#include <thread>
#include <random>
#include "GenerateFlowFile.h"
#include "ProcessContext.h"
#include "ProcessSession.h"
const char *GenerateFlowFile::DATA_FORMAT_BINARY = "Binary";
const char *GenerateFlowFile::DATA_FORMAT_TEXT = "Text";
const std::string GenerateFlowFile::ProcessorName("GenerateFlowFile");
Property GenerateFlowFile::FileSize("File Size", "The size of the file that will be used", "1 kB");
Property GenerateFlowFile::BatchSize("Batch Size", "The number of FlowFiles to be transferred in each invocation", "1");
Property GenerateFlowFile::DataFormat("Data Format", "Specifies whether the data should be Text or Binary", GenerateFlowFile::DATA_FORMAT_BINARY);
Property GenerateFlowFile::UniqueFlowFiles("Unique FlowFiles",
"If true, each FlowFile that is generated will be unique. If false, a random value will be generated and all FlowFiles", "true");
Relationship GenerateFlowFile::Success("success", "success operational on the flow record");
void GenerateFlowFile::initialize()
{
//! Set the supported properties
std::set<Property> properties;
properties.insert(FileSize);
properties.insert(BatchSize);
properties.insert(DataFormat);
properties.insert(UniqueFlowFiles);
setSupportedProperties(properties);
//! Set the supported relationships
std::set<Relationship> relationships;
relationships.insert(Success);
setSupportedRelationships(relationships);
}
void GenerateFlowFile::onTrigger(ProcessContext *context, ProcessSession *session)
{
int64_t batchSize = 1;
bool uniqueFlowFile = true;
int64_t fileSize = 1024;
std::string value;
if (context->getProperty(FileSize.getName(), value))
{
Property::StringToInt(value, fileSize);
}
if (context->getProperty(BatchSize.getName(), value))
{
Property::StringToInt(value, batchSize);
}
if (context->getProperty(UniqueFlowFiles.getName(), value))
{
Property::StringToBool(value, uniqueFlowFile);
}
if (!uniqueFlowFile)
{
char *data;
data = new char[fileSize];
if (!data)
return;
uint64_t dataSize = fileSize;
GenerateFlowFile::WriteCallback callback(data, dataSize);
char *current = data;
for (int i = 0; i < fileSize; i+= sizeof(int))
{
int randValue = random();
*((int *) current) = randValue;
current += sizeof(int);
}
for (int i = 0; i < batchSize; i++)
{
// For each batch
FlowFileRecord *flowFile = session->create();
if (!flowFile)
return;
if (fileSize > 0)
session->write(flowFile, &callback);
session->transfer(flowFile, Success);
}
delete[] data;
}
else
{
if (!_data)
{
// We have not create the unique data yet
_data = new char[fileSize];
_dataSize = fileSize;
char *current = _data;
for (int i = 0; i < fileSize; i+= sizeof(int))
{
int randValue = random();
*((int *) current) = randValue;
// *((int *) current) = (0xFFFFFFFF & i);
current += sizeof(int);
}
}
GenerateFlowFile::WriteCallback callback(_data, _dataSize);
for (int i = 0; i < batchSize; i++)
{
// For each batch
FlowFileRecord *flowFile = session->create();
if (!flowFile)
return;
if (fileSize > 0)
session->write(flowFile, &callback);
session->transfer(flowFile, Success);
}
}
}