blob: a92ba21b3f86915397cf5c7e6e6fcdeb37c5a6c7 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef NIFI_MINIFI_CPP_CUSTOMPROCESSORS_H
#define NIFI_MINIFI_CPP_CUSTOMPROCESSORS_H
#include <unordered_map>
#include <string>
#include <random>
#include <YamlConfiguration.h>
#include "core/Processor.h"
#include "TestBase.h"
#include "processors/GenerateFlowFile.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace processors {
static core::Relationship Apple{"apple", ""};
static core::Relationship Banana{"banana", ""};
// The probability that this processor routes to Apple
static core::Property AppleProbability = core::PropertyBuilder::createProperty("AppleProbability")->withDefaultValue<int>(100)->isRequired(true)->build();
// The probability that this processor routes to Banana
static core::Property BananaProbability = core::PropertyBuilder::createProperty("BananaProbability")->withDefaultValue<int>(0)->isRequired(true)->build();
class ProcessorWithStatistics {
public:
std::atomic<int> trigger_count{0};
std::function<void()> onTriggerCb_;
};
class TestProcessor : public core::Processor, public ProcessorWithStatistics {
public:
TestProcessor(const std::string& name, const utils::Identifier &uuid) : Processor(name, uuid) {}
TestProcessor(const std::string& name) : Processor(name) {}
void initialize() override {
setSupportedProperties({AppleProbability, BananaProbability});
setSupportedRelationships({Apple, Banana});
}
void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override {
++trigger_count;
if (onTriggerCb_) {
onTriggerCb_();
}
auto flowFile = session->get();
if (!flowFile) return;
std::random_device rd{};
std::uniform_int_distribution<int> dis(0, 100);
int rand = dis(rd);
if (rand <= apple_probability_) {
session->transfer(flowFile, Apple);
return;
}
rand -= apple_probability_;
if (rand <= banana_probability_) {
session->transfer(flowFile, Banana);
return;
}
throw std::runtime_error("Couldn't route file");
}
void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override {
int apple;
bool appleSuccess = context->getProperty(AppleProbability.getName(), apple);
assert(appleSuccess);
int banana;
bool bananaSuccess = context->getProperty(BananaProbability.getName(), banana);
assert(bananaSuccess);
apple_probability_ = apple;
banana_probability_ = banana;
}
std::atomic<int> apple_probability_;
std::atomic<int> banana_probability_;
};
class TestFlowFileGenerator : public processors::GenerateFlowFile, public ProcessorWithStatistics {
public:
TestFlowFileGenerator(const std::string& name, const utils::Identifier &uuid) : GenerateFlowFile(name, uuid) {}
TestFlowFileGenerator(const std::string& name) : GenerateFlowFile(name) {}
void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override {
++trigger_count;
if (onTriggerCb_) {
onTriggerCb_();
}
GenerateFlowFile::onTrigger(context.get(), session.get());
}
};
REGISTER_RESOURCE(TestProcessor, "Processor used for testing cycles");
REGISTER_RESOURCE(TestFlowFileGenerator, "Processor generating files and notifying us");
} /* namespace processors */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */
#endif // NIFI_MINIFI_CPP_CUSTOMPROCESSORS_H