blob: 3a3d4c1050b5142357f1bdca3b3ad15cebbd7485 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <minifi-cpp/core/FlowFile.h>
#include "aws/kinesis/model/PutRecordsRequest.h"
#include "core/Resource.h"
#include "processors/PutKinesisStream.h"
#include "unit/Catch.h"
#include "unit/SingleProcessorTestController.h"
#include "unit/TestBase.h"
#include "unit/ProcessorUtils.h"
namespace org::apache::nifi::minifi::aws::processors::test {
class MockKinesisClient final : public Aws::Kinesis::KinesisClient {
public:
enum class KinesisBehaviour {
HappyPath,
Failure,
RecordSizeMismatch,
OddsFail,
};
Aws::Kinesis::Model::PutRecordsOutcome PutRecords(const Aws::Kinesis::Model::PutRecordsRequest& request) const override {
switch (behaviour_) {
case KinesisBehaviour::HappyPath: {
Aws::Kinesis::Model::PutRecordsResult result;
for ([[maybe_unused]] const auto& request_entry : request.GetRecords()) {
Aws::Kinesis::Model::PutRecordsResultEntry result_entry;
result_entry.SetSequenceNumber(fmt::format("sequence_number_{}", ++sequence_number_));
result_entry.SetShardId("shard_id");
result.AddRecords(result_entry);
}
return result;
}
case KinesisBehaviour::Failure: {
auto err = Aws::Kinesis::KinesisError();
err.SetResponseCode(Aws::Http::HttpResponseCode::UNAUTHORIZED);
err.SetMessage("Unauthorized");
return err;
}
case KinesisBehaviour::RecordSizeMismatch: {
Aws::Kinesis::Model::PutRecordsResult result;
return result;
}
case KinesisBehaviour::OddsFail: {
Aws::Kinesis::Model::PutRecordsResult result;
uint8_t i = 0;
for ([[maybe_unused]] const auto& request_entry : request.GetRecords()) {
Aws::Kinesis::Model::PutRecordsResultEntry result_entry;
if (++i%2 == 0) {
result_entry.SetErrorCode("8");
result_entry.SetErrorMessage("Some error message");
} else {
result_entry.SetSequenceNumber(fmt::format("sequence_number_{}", ++sequence_number_));
result_entry.SetShardId("shard_id");
}
result.AddRecords(result_entry);
}
return result;
}
default: { throw std::invalid_argument("Unknown behaviour"); }
}
}
KinesisBehaviour behaviour_ = KinesisBehaviour::HappyPath;
mutable uint32_t sequence_number_ = 0;
};
class PutKinesisStreamMocked final : public aws::processors::PutKinesisStream { // NOLINT(cppcoreguidelines-special-member-functions)
public:
static constexpr const char* Description = "PutKinesisStreamMocked";
using PutKinesisStream::PutKinesisStream;
~PutKinesisStreamMocked() override = default;
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
std::unique_ptr<Aws::Kinesis::KinesisClient> getClient(const Aws::Auth::AWSCredentials&) override {
auto client = std::make_unique<MockKinesisClient>();
client->behaviour_ = behaviour_;
return client;
}
MockKinesisClient::KinesisBehaviour behaviour_ = MockKinesisClient::KinesisBehaviour::HappyPath;
};
REGISTER_RESOURCE(PutKinesisStreamMocked, Processor);
TEST_CASE("PutKinesisStream record size mismatch path") {
minifi::test::SingleProcessorTestController controller(minifi::test::utils::make_processor<PutKinesisStreamMocked>("PutKinesisStream"));
auto put_kinesis_stream = controller.getProcessor<PutKinesisStreamMocked>();
REQUIRE(put_kinesis_stream);
put_kinesis_stream.get().behaviour_ = MockKinesisClient::KinesisBehaviour::RecordSizeMismatch;
controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AccessKey, "access_key");
controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::SecretKey, "secret_key");
controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AmazonKinesisStreamName, "stream_name");
const auto result = controller.trigger({{.content = "foo"}, {.content = "bar"}});
CHECK(result.at(PutKinesisStream::Success).empty());
REQUIRE(result.at(PutKinesisStream::Failure).size() == 2);
const auto res_ff_1 = result.at(PutKinesisStream::Failure).at(0);
const auto res_ff_2 = result.at(PutKinesisStream::Failure).at(1);
CHECK(controller.plan->getContent(res_ff_1) == "foo");
CHECK(controller.plan->getContent(res_ff_2) == "bar");
CHECK(res_ff_1->getAttribute(PutKinesisStream::AwsKinesisErrorMessage.name) == "Record size mismatch");
CHECK(res_ff_2->getAttribute(PutKinesisStream::AwsKinesisErrorMessage.name) == "Record size mismatch");
}
TEST_CASE("PutKinesisStream record size failure path") {
minifi::test::SingleProcessorTestController controller(minifi::test::utils::make_processor<PutKinesisStreamMocked>("PutKinesisStream"));
auto put_kinesis_stream = controller.getProcessor<PutKinesisStreamMocked>();
REQUIRE(put_kinesis_stream);
put_kinesis_stream.get().behaviour_ = MockKinesisClient::KinesisBehaviour::Failure;
controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AccessKey, "access_key");
controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::SecretKey, "secret_key");
controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AmazonKinesisStreamName, "stream_name");
const auto result = controller.trigger({{.content = "foo"}, {.content = "bar"}});
CHECK(result.at(PutKinesisStream::Success).empty());
REQUIRE(result.at(PutKinesisStream::Failure).size() == 2);
const auto res_ff_1 = result.at(PutKinesisStream::Failure).at(0);
const auto res_ff_2 = result.at(PutKinesisStream::Failure).at(1);
CHECK(controller.plan->getContent(res_ff_1) == "foo");
CHECK(controller.plan->getContent(res_ff_2) == "bar");
CHECK(res_ff_1->getAttribute(PutKinesisStream::AwsKinesisErrorMessage.name) == "Unauthorized");
CHECK(res_ff_1->getAttribute(PutKinesisStream::AwsKinesisErrorCode.name) == "401");
CHECK(res_ff_2->getAttribute(PutKinesisStream::AwsKinesisErrorCode.name) == "401");
CHECK(res_ff_2->getAttribute(PutKinesisStream::AwsKinesisErrorMessage.name) == "Unauthorized");
}
TEST_CASE("PutKinesisStream partial failure path") {
minifi::test::SingleProcessorTestController controller(minifi::test::utils::make_processor<PutKinesisStreamMocked>("PutKinesisStream"));
auto put_kinesis_stream = controller.getProcessor<PutKinesisStreamMocked>();
put_kinesis_stream.get().behaviour_ = MockKinesisClient::KinesisBehaviour::OddsFail;
controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AccessKey, "access_key");
controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::SecretKey, "secret_key");
controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AmazonKinesisStreamName, "stream_name");
const auto result = controller.trigger({{.content = "foo"}, {.content = "bar"}});
REQUIRE(result.at(PutKinesisStream::Success).size() == 1);
REQUIRE(result.at(PutKinesisStream::Failure).size() == 1);
const auto succ_ff = result.at(PutKinesisStream::Success).at(0);
const auto fail_ff = result.at(PutKinesisStream::Failure).at(0);
CHECK(controller.plan->getContent(succ_ff) == "foo");
CHECK(controller.plan->getContent(fail_ff) == "bar");
CHECK(succ_ff->getAttribute(PutKinesisStream::AwsKinesisSequenceNumber.name) == "sequence_number_1");
CHECK(succ_ff->getAttribute(PutKinesisStream::AwsKinesisShardId.name) == "shard_id");
CHECK(fail_ff->getAttribute(PutKinesisStream::AwsKinesisErrorCode.name) == "8");
CHECK(fail_ff->getAttribute(PutKinesisStream::AwsKinesisErrorMessage.name) == "Some error message");
}
TEST_CASE("PutKinesisStream simple happy path") {
minifi::test::SingleProcessorTestController controller(minifi::test::utils::make_processor<PutKinesisStreamMocked>("PutKinesisStream"));
auto put_kinesis_stream = controller.getProcessor();
controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AccessKey, "access_key");
controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::SecretKey, "secret_key");
controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AmazonKinesisStreamName, "stream_name");
const auto result = controller.trigger({{.content = "foo"}, {.content = "bar"}});
CHECK(result.at(PutKinesisStream::Failure).empty());
REQUIRE(result.at(PutKinesisStream::Success).size() == 2);
const auto res_ff_1 = result.at(PutKinesisStream::Success).at(0);
const auto res_ff_2 = result.at(PutKinesisStream::Success).at(1);
CHECK(controller.plan->getContent(res_ff_1) == "foo");
CHECK(controller.plan->getContent(res_ff_2) == "bar");
CHECK(res_ff_1->getAttribute(PutKinesisStream::AwsKinesisSequenceNumber.name) == "sequence_number_1");
CHECK(res_ff_1->getAttribute(PutKinesisStream::AwsKinesisShardId.name) == "shard_id");
CHECK(res_ff_2->getAttribute(PutKinesisStream::AwsKinesisSequenceNumber.name) == "sequence_number_2");
CHECK(res_ff_2->getAttribute(PutKinesisStream::AwsKinesisShardId.name) == "shard_id");
}
TEST_CASE("PutKinesisStream smaller batch size than available ffs") {
minifi::test::SingleProcessorTestController controller(minifi::test::utils::make_processor<PutKinesisStreamMocked>("PutKinesisStream"));
auto put_kinesis_stream = controller.getProcessor();
controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AccessKey, "access_key");
controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::SecretKey, "secret_key");
controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AmazonKinesisStreamName, "stream_name");
controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::MessageBatchSize, "10");
const auto result = controller.trigger({
{.content = "Lorem"},
{.content = "ipsum"},
{.content = "dolor"},
{.content = "sit"},
{.content = "amet"},
{.content = "consectetur"},
{.content = "adipiscing"},
{.content = "elit"},
{.content = "Morbi"},
{.content = "dapibus"},
{.content = "risus"},
{.content = "a"},
{.content = "bibendum"},
{.content = "luctus"}});
CHECK(result.at(PutKinesisStream::Success).size() == 10);
}
TEST_CASE("PutKinesisStream max batch data size fills up") {
minifi::test::SingleProcessorTestController controller(minifi::test::utils::make_processor<PutKinesisStreamMocked>("PutKinesisStream"));
auto put_kinesis_stream = controller.getProcessor();
controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AccessKey, "access_key");
controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::SecretKey, "secret_key");
controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AmazonKinesisStreamName, "stream_name");
controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::MessageBatchSize, "10");
controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::MaxBatchDataSize, "12 B");
const auto result = controller.trigger({
{.content = "Lorem"},
{.content = "ipsum"},
{.content = "dolor"},
{.content = "sit"},
{.content = "amet"},
{.content = "consectetur"},
{.content = "adipiscing"},
{.content = "elit"},
{.content = "Morbi"},
{.content = "dapibus"},
{.content = "risus"},
{.content = "a"},
{.content = "bibendum"},
{.content = "luctus"}});
REQUIRE(result.at(PutKinesisStream::Success).size() == 3);
CHECK(controller.plan->getContent(result.at(PutKinesisStream::Success).at(0)) == "Lorem");
CHECK(controller.plan->getContent(result.at(PutKinesisStream::Success).at(1)) == "ipsum");
CHECK(controller.plan->getContent(result.at(PutKinesisStream::Success).at(2)) == "dolor");
}
TEST_CASE("PutKinesisStream max batch data size to different streams") {
minifi::test::SingleProcessorTestController controller(minifi::test::utils::make_processor<PutKinesisStreamMocked>("PutKinesisStream"));
auto put_kinesis_stream = controller.getProcessor();
controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AccessKey, "access_key");
controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::SecretKey, "secret_key");
controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::MessageBatchSize, "10");
controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::MaxBatchDataSize, "12 B");
controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AmazonKinesisStreamName, "${stream_name}");
const auto result = controller.trigger({
{.content = "Lorem", .attributes = {{"stream_name", "stream_one"}}},
{.content = "ipsum", .attributes = {{"stream_name", "stream_two"}}},
{.content = "dolor", .attributes = {{"stream_name", "stream_three"}}},
{.content = "sit", .attributes = {{"stream_name", "stream_four"}}},
{.content = "amet", .attributes = {{"stream_name", "stream_five"}}},
{.content = "consectetur", .attributes = {{"stream_name", "stream_six"}}},
{.content = "adipiscing", .attributes = {{"stream_name", "stream_seven"}}},
{.content = "elit", .attributes = {{"stream_name", "stream_eight"}}},
{.content = "Morbi", .attributes = {{"stream_name", "stream_nine"}}},
{.content = "dapibus", .attributes = {{"stream_name", "stream_ten"}}},
{.content = "risus", .attributes = {{"stream_name", "stream_eleven"}}},
{.content = "a", .attributes = {{"stream_name", "stream_twelve"}}},
{.content = "bibendum", .attributes = {{"stream_name", "stream_thirteen"}}},
{.content = "luctus", .attributes = {{"stream_name", "stream_fourteen"}}}});
CHECK(result.at(PutKinesisStream::Success).size() == 10);
}
TEST_CASE("PutKinesisStream with too large message") {
minifi::test::SingleProcessorTestController controller(minifi::test::utils::make_processor<PutKinesisStreamMocked>("PutKinesisStream"));
auto put_kinesis_stream = controller.getProcessor();
controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AccessKey, "access_key");
controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::SecretKey, "secret_key");
controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AmazonKinesisStreamName, "stream_name");
std::string too_large_msg((1_MB + 10), 'x');
const auto result = controller.trigger(too_large_msg);
REQUIRE(result.at(PutKinesisStream::Failure).size() == 1);
CHECK(result.at(PutKinesisStream::Success).empty());
const auto res_ff_1 = result.at(PutKinesisStream::Failure).at(0);
CHECK(controller.plan->getContent(res_ff_1) == too_large_msg);
CHECK(res_ff_1->getAttribute(PutKinesisStream::AwsKinesisErrorMessage.name) == "record too big 1000010, max allowed 1000000");
CHECK(res_ff_1->getAttribute(PutKinesisStream::AwsKinesisErrorCode.name) == std::nullopt);
}
} // namespace org::apache::nifi::minifi::aws::processors::test