blob: 6e743561bf4ef74c648d8c77a96100dd3bf22c1c [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <memory>
#include <random>
#include <string>
#include <utility>
#include "core/ProcessorImpl.h"
#include "core/PropertyDefinition.h"
#include "core/PropertyDefinitionBuilder.h"
#include "minifi-cpp/core/PropertyValidator.h"
#include "core/RelationshipDefinition.h"
#include "core/Resource.h"
#include "processors/GenerateFlowFile.h"
#include "unit/Catch.h"
#include "unit/TestBase.h"
#include "utils/ProcessorConfigUtils.h"
namespace org::apache::nifi::minifi::processors {
static constexpr auto Apple = core::RelationshipDefinition{"apple", ""};
static constexpr auto Banana = core::RelationshipDefinition{"banana", ""};
// The probability that this processor routes to Apple
static constexpr auto AppleProbability = core::PropertyDefinitionBuilder<>::createProperty("AppleProbability")
.withValidator(core::StandardPropertyValidators::INTEGER_VALIDATOR)
.withDefaultValue("100")
.isRequired(true)
.build();
// The probability that this processor routes to Banana
static constexpr auto BananaProbability = core::PropertyDefinitionBuilder<>::createProperty("BananaProbability")
.withValidator(core::StandardPropertyValidators::INTEGER_VALIDATOR)
.withDefaultValue("0")
.isRequired(true)
.build();
class ProcessorWithStatistics {
public:
std::atomic<int> trigger_count{0};
std::function<void()> onTriggerCb_;
};
class TestProcessor : public core::ProcessorImpl, public ProcessorWithStatistics {
public:
using ProcessorImpl::ProcessorImpl;
static constexpr const char* Description = "Processor used for testing cycles";
static constexpr auto Properties = std::to_array<core::PropertyReference>({AppleProbability, BananaProbability});
static constexpr auto Relationships = std::array{Apple, Banana};
static constexpr bool SupportsDynamicProperties = false;
static constexpr bool SupportsDynamicRelationships = false;
static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_ALLOWED;
static constexpr bool IsSingleThreaded = false;
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
void initialize() override {
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
}
void onTrigger(core::ProcessContext&, core::ProcessSession& session) override {
++trigger_count;
if (onTriggerCb_) {
onTriggerCb_();
}
auto flowFile = session.get();
if (!flowFile) return;
std::random_device rd{};
std::uniform_int_distribution<int64_t> dis(0, 100);
int64_t rand = dis(rd);
if (rand <= apple_probability_) {
session.transfer(flowFile, Apple);
return;
}
rand -= apple_probability_;
if (rand <= banana_probability_) {
session.transfer(flowFile, Banana);
return;
}
throw std::runtime_error("Couldn't route file");
}
void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) override {
const int64_t apple = utils::parseI64Property(context, AppleProbability);
const int64_t banana = utils::parseI64Property(context, BananaProbability);
apple_probability_ = apple;
banana_probability_ = banana;
}
std::atomic<int64_t> apple_probability_;
std::atomic<int64_t> banana_probability_;
};
class TestFlowFileGenerator : public processors::GenerateFlowFile, public ProcessorWithStatistics {
public:
using GenerateFlowFile::GenerateFlowFile;
static constexpr const char* Description = "Processor generating files and notifying us";
using processors::GenerateFlowFile::onTrigger;
void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override {
++trigger_count;
if (onTriggerCb_) {
onTriggerCb_();
}
GenerateFlowFile::onTrigger(context, session);
}
};
REGISTER_RESOURCE(TestProcessor, Processor);
REGISTER_RESOURCE(TestFlowFileGenerator, Processor);
} // namespace org::apache::nifi::minifi::processors