blob: b91b4c098124cbab61a4f1ecd4a0e12be9f17539 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <memory>
#include <set>
#include <string>
#include <string_view>
#include <unordered_map>
#include <utility>
#include <vector>
#include "TestBase.h"
#include "FlowFileRecord.h"
#include "core/Processor.h"
namespace org::apache::nifi::minifi::test {
class SingleProcessorTestController : public TestController {
public:
explicit SingleProcessorTestController(const std::shared_ptr<core::Processor>& processor)
: processor_{plan->addProcessor(processor, processor->getName())}
{}
auto trigger() {
plan->runProcessor(processor_);
std::unordered_map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> result;
for (const auto& [relationship, connection]: outgoing_connections_) {
std::set<std::shared_ptr<core::FlowFile>> expired_flow_files;
std::vector<std::shared_ptr<core::FlowFile>> output_flow_files;
while (connection->isWorkAvailable()) {
auto output_flow_file = connection->poll(expired_flow_files);
assert(expired_flow_files.empty());
if (!output_flow_file) continue;
output_flow_files.push_back(std::move(output_flow_file));
}
result.insert_or_assign(relationship, std::move(output_flow_files));
}
return result;
}
auto trigger(const std::string_view input_flow_file_content, std::unordered_map<std::string, std::string> input_flow_file_attributes = {}) {
const auto new_flow_file = createFlowFile(input_flow_file_content, std::move(input_flow_file_attributes));
input_->put(new_flow_file);
return trigger();
}
core::Relationship addDynamicRelationship(std::string name) {
auto relationship = core::Relationship{std::move(name), ""};
outgoing_connections_.insert_or_assign(relationship, plan->addConnection(processor_, relationship, nullptr));
return relationship;
}
private:
std::shared_ptr<core::FlowFile> createFlowFile(const std::string_view content, std::unordered_map<std::string, std::string> attributes) {
const auto flow_file = std::make_shared<FlowFileRecord>();
for (auto& attr : std::move(attributes)) {
flow_file->setAttribute(attr.first, std::move(attr.second));
}
auto content_session = plan->getContentRepo()->createSession();
auto claim = content_session->create();
auto stream = content_session->write(claim);
stream->write(reinterpret_cast<const uint8_t*>(content.data()), content.size());
flow_file->setResourceClaim(claim);
flow_file->setSize(stream->size());
flow_file->setOffset(0);
stream->close();
content_session->commit();
return flow_file;
}
public:
std::shared_ptr<TestPlan> plan = createPlan();
protected:
core::Processor& getProcessor() const { return *processor_; }
private:
std::shared_ptr<core::Processor> processor_;
std::unordered_map<core::Relationship, Connection*> outgoing_connections_{[this] {
std::unordered_map<core::Relationship, Connection*> result;
for (const auto& relationship: processor_->getSupportedRelationships()) {
result.insert_or_assign(relationship, plan->addConnection(processor_, relationship, nullptr));
}
return result;
}()};
Connection* input_ = plan->addConnection(nullptr, core::Relationship{"success", "success"}, processor_);
};
} // namespace org::apache::nifi::minifi::test