blob: 48d035f04823280ff8e6e7174617d668083f4a58 [file]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <memory>
#include <new>
#include <random>
#include <string>
#include "unit/SingleProcessorTestController.h"
#include "unit/Catch.h"
#include "PutUDP.h"
#include "minifi-cpp/core/ProcessContext.h"
#include "utils/net/UdpServer.h"
#include "utils/expected.h"
#include "utils/StringUtils.h"
#include "unit/TestUtils.h"
using namespace std::literals::chrono_literals;
namespace org::apache::nifi::minifi::processors {
namespace {
std::optional<utils::net::Message> tryDequeueWithTimeout(utils::net::UdpServer& listener, std::chrono::milliseconds timeout = 200ms, std::chrono::milliseconds interval = 10ms) {
auto start_time = std::chrono::system_clock::now();
while (start_time + timeout > std::chrono::system_clock::now()) {
if (const auto result = listener.tryDequeue())
return result;
std::this_thread::sleep_for(interval);
}
return std::nullopt;
}
} // namespace
TEST_CASE("PutUDP", "[putudp]") {
test::SingleProcessorTestController controller{minifi::test::utils::make_processor<PutUDP>("PutUDP")};
const auto put_udp = controller.getProcessor();
LogTestController::getInstance().setTrace<PutUDP>();
LogTestController::getInstance().setTrace<core::ProcessContext>();
REQUIRE(put_udp->setProperty(PutUDP::Hostname.name, "${literal('localhost')}"));
utils::net::UdpServer listener{std::nullopt, 0, core::logging::LoggerFactory<utils::net::UdpServer>::getLogger()};
auto server_thread = std::thread([&listener]() { listener.run(); });
uint16_t port = listener.getPort();
auto deadline = std::chrono::steady_clock::now() + 200ms;
while (port == 0 && deadline > std::chrono::steady_clock::now()) {
std::this_thread::sleep_for(20ms);
port = listener.getPort();
}
auto cleanup_server = gsl::finally([&]{
listener.stop();
server_thread.join();
});
REQUIRE(put_udp->setProperty(PutUDP::Port.name, utils::string::join_pack("${literal('", std::to_string(port), "')}")));
{
const char* const message = "first message: hello";
const auto result = controller.trigger(message);
const auto& success_flow_files = result.at(PutUDP::Success);
REQUIRE(success_flow_files.size() == 1);
CHECK(result.at(PutUDP::Failure).empty());
CHECK(controller.plan->getContent(success_flow_files[0]) == message);
auto received_message = tryDequeueWithTimeout(listener);
REQUIRE(received_message);
CHECK(received_message->message_data == message);
CHECK(received_message->protocol == utils::net::IpProtocol::UDP);
CHECK(!received_message->remote_address.to_string().empty());
}
{
const char* const message = "longer message AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"; // NOLINT
const auto result = controller.trigger(message);
const auto& success_flow_files = result.at(PutUDP::Success);
REQUIRE(success_flow_files.size() == 1);
CHECK(result.at(PutUDP::Failure).empty());
CHECK(controller.plan->getContent(success_flow_files[0]) == message);
auto received_message = tryDequeueWithTimeout(listener);
REQUIRE(received_message);
CHECK(received_message->message_data == message);
CHECK(received_message->protocol == utils::net::IpProtocol::UDP);
CHECK(!received_message->remote_address.to_string().empty());
}
{
LogTestController::getInstance().clear();
auto message = std::string(65536, 'a');
const auto result = controller.trigger(message);
const auto& failure_flow_files = result.at(PutUDP::Failure);
REQUIRE(failure_flow_files.size() == 1);
CHECK(result.at(PutUDP::Success).empty());
CHECK(controller.plan->getContent(failure_flow_files[0]) == message);
CHECK((LogTestController::getInstance().contains("Message too long")
|| LogTestController::getInstance().contains("A message sent on a datagram socket was larger than the internal message buffer")));
}
{
LogTestController::getInstance().clear();
const char* const message = "message for invalid host";
controller.plan->setProperty(put_udp, PutUDP::Hostname, "invalid_hostname");
const auto result = controller.trigger(message);
const auto& failure_flow_files = result.at(PutUDP::Failure);
auto received_message = tryDequeueWithTimeout(listener);
CHECK(!received_message);
REQUIRE(failure_flow_files.size() == 1);
CHECK(result.at(PutUDP::Success).empty());
CHECK(controller.plan->getContent(failure_flow_files[0]) == message);
CHECK((LogTestController::getInstance().contains("Host not found") || LogTestController::getInstance().contains("No such host is known")));
}
}
} // namespace org::apache::nifi::minifi::processors