blob: 49f4e3f325c8e9d0fe9a75614368aa5170a011e9 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <memory>
#include <new>
#include <random>
#include <string>
#include "SingleProcessorTestController.h"
#include "Catch.h"
#include "PutUDP.h"
#include "core/ProcessContext.h"
#include "utils/net/DNS.h"
#include "utils/net/Socket.h"
#include "utils/expected.h"
#include "utils/StringUtils.h"
namespace org::apache::nifi::minifi::processors {
namespace {
struct DatagramListener {
DatagramListener(const char* const hostname, const char* const port)
:resolved_names_{utils::net::resolveHost(hostname, port, utils::net::IpProtocol::Udp).value()},
open_socket_{utils::net::open_socket(*resolved_names_)
| utils::valueOrElse([=]() -> utils::net::OpenSocketResult { throw std::runtime_error{utils::StringUtils::join_pack("Failed to connect to ", hostname, " on port ", port)}; })}
{
const auto bind_result = bind(open_socket_.socket_.get(), open_socket_.selected_name->ai_addr, open_socket_.selected_name->ai_addrlen);
if (bind_result == utils::net::SocketError) {
throw std::runtime_error{utils::StringUtils::join_pack("bind: ", utils::net::get_last_socket_error().message())};
}
}
struct ReceiveResult {
std::string remote_address;
std::string message;
};
[[nodiscard]] ReceiveResult receive(const size_t max_message_size = 8192) const {
ReceiveResult result;
result.message.resize(max_message_size);
sockaddr_storage remote_address{};
socklen_t addrlen = sizeof(remote_address);
const auto recv_result = recvfrom(open_socket_.socket_.get(), result.message.data(), result.message.size(), 0, std::launder(reinterpret_cast<sockaddr*>(&remote_address)), &addrlen);
if (recv_result == utils::net::SocketError) {
throw std::runtime_error{utils::StringUtils::join_pack("recvfrom: ", utils::net::get_last_socket_error().message())};
}
result.message.resize(gsl::narrow<size_t>(recv_result));
result.remote_address = utils::net::sockaddr_ntop(std::launder(reinterpret_cast<sockaddr*>(&remote_address)));
return result;
}
std::unique_ptr<addrinfo, utils::net::addrinfo_deleter> resolved_names_;
utils::net::OpenSocketResult open_socket_;
};
} // namespace
// Testing the failure relationship is not required, because since UDP in general without guarantees, flow files are always routed to success, unless there is
// some weird IO error with the content repo.
TEST_CASE("PutUDP", "[putudp]") {
const auto putudp = std::make_shared<PutUDP>("PutUDP");
auto random_engine = std::mt19937{std::random_device{}()}; // NOLINT: "Missing space before { [whitespace/braces] [5]"
// most systems use ports 32768 - 65535 as ephemeral ports, so avoid binding to those
const auto port = std::uniform_int_distribution<uint16_t>{10000, 32768 - 1}(random_engine);
const auto port_str = std::to_string(port);
test::SingleProcessorTestController controller{putudp};
LogTestController::getInstance().setTrace<PutUDP>();
LogTestController::getInstance().setTrace<core::ProcessContext>();
LogTestController::getInstance().setLevelByClassName(spdlog::level::trace, "org::apache::nifi::minifi::core::ProcessContextExpr");
putudp->setProperty(PutUDP::Hostname, "${literal('localhost')}");
putudp->setProperty(PutUDP::Port, utils::StringUtils::join_pack("${literal('", port_str, "')}"));
DatagramListener listener{"localhost", port_str.c_str()};
{
const char* const message = "first message: hello";
const auto result = controller.trigger(message);
const auto& success_flow_files = result.at(PutUDP::Success);
REQUIRE(success_flow_files.size() == 1);
REQUIRE(result.at(PutUDP::Failure).empty());
REQUIRE(controller.plan->getContent(success_flow_files[0]) == message);
auto receive_result = listener.receive();
REQUIRE(receive_result.message == message);
REQUIRE(!receive_result.remote_address.empty());
}
{
const char* const message = "longer message AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"; // NOLINT
const auto result = controller.trigger(message);
const auto& success_flow_files = result.at(PutUDP::Success);
REQUIRE(success_flow_files.size() == 1);
REQUIRE(result.at(PutUDP::Failure).empty());
REQUIRE(controller.plan->getContent(success_flow_files[0]) == message);
auto receive_result = listener.receive();
REQUIRE(receive_result.message == message);
REQUIRE(!receive_result.remote_address.empty());
}
}
} // namespace org::apache::nifi::minifi::processors