blob: 696d3b4dc6593576dbce622915407ca5471d7eae [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "../processors/FetchGCSObject.h"
#include "../controllerservices/GCPCredentialsControllerService.h"
#include "GCPAttributes.h"
#include "core/Resource.h"
#include "SingleProcessorTestController.h"
#include "google/cloud/storage/testing/mock_client.h"
#include "google/cloud/storage/internal/object_metadata_parser.h"
#include "google/cloud/storage/testing/canonical_errors.h"
namespace gcs = ::google::cloud::storage;
namespace minifi_gcp = org::apache::nifi::minifi::extensions::gcp;
using FetchGCSObject = org::apache::nifi::minifi::extensions::gcp::FetchGCSObject;
using GCPCredentialsControllerService = org::apache::nifi::minifi::extensions::gcp::GCPCredentialsControllerService;
using ::google::cloud::storage::testing::canonical_errors::TransientError;
using ::google::cloud::storage::testing::canonical_errors::PermanentError;
namespace {
class FetchGCSObjectMocked : public FetchGCSObject {
using org::apache::nifi::minifi::extensions::gcp::FetchGCSObject::FetchGCSObject;
public:
gcs::Client getClient() const override {
return gcs::testing::ClientFromMock(mock_client_, *retry_policy_);
}
std::shared_ptr<gcs::testing::MockClient> mock_client_ = std::make_shared<gcs::testing::MockClient>();
};
REGISTER_RESOURCE(FetchGCSObjectMocked, "FetchGCSObjectMocked");
} // namespace
class FetchGCSObjectTests : public ::testing::Test {
public:
void SetUp() override {
gcp_credentials_node_ = test_controller_.plan->addController("GCPCredentialsControllerService", "gcp_credentials_controller_service");
test_controller_.plan->setProperty(gcp_credentials_node_,
GCPCredentialsControllerService::CredentialsLoc.getName(),
toString(GCPCredentialsControllerService::CredentialsLocation::USE_ANONYMOUS_CREDENTIALS));
test_controller_.plan->setProperty(fetch_gcs_object_,
FetchGCSObject::GCPCredentials.getName(),
"gcp_credentials_controller_service");
}
std::shared_ptr<FetchGCSObjectMocked> fetch_gcs_object_ = std::make_shared<FetchGCSObjectMocked>("FetchGCSObjectMocked");
org::apache::nifi::minifi::test::SingleProcessorTestController test_controller_{fetch_gcs_object_};
std::shared_ptr<minifi::core::controller::ControllerServiceNode> gcp_credentials_node_;
};
TEST_F(FetchGCSObjectTests, MissingBucket) {
EXPECT_CALL(*fetch_gcs_object_->mock_client_, CreateResumableSession).Times(0);
EXPECT_TRUE(test_controller_.plan->setProperty(fetch_gcs_object_, FetchGCSObject::Bucket.getName(), ""));
const auto& result = test_controller_.trigger("hello world");
EXPECT_EQ(0, result.at(FetchGCSObject::Success).size());
ASSERT_EQ(1, result.at(FetchGCSObject::Failure).size());
EXPECT_EQ(std::nullopt, result.at(FetchGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
EXPECT_EQ(std::nullopt, result.at(FetchGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_REASON));
EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(FetchGCSObject::Failure)[0]));
}
TEST_F(FetchGCSObjectTests, ServerError) {
EXPECT_CALL(*fetch_gcs_object_->mock_client_, ReadObject)
.WillOnce([](gcs::internal::ReadObjectRangeRequest const& request) {
EXPECT_EQ(request.bucket_name(), "bucket-from-property") << request;
auto* mock_source = new gcs::testing::MockObjectReadSource;
::testing::InSequence seq;
EXPECT_CALL(*mock_source, IsOpen).WillRepeatedly(testing::Return(true));
EXPECT_CALL(*mock_source, Read)
.WillOnce(testing::Return(google::cloud::Status(
google::cloud::StatusCode::kInvalidArgument,
"Invalid Argument")));
EXPECT_CALL(*mock_source, IsOpen).WillRepeatedly(testing::Return(false));
std::unique_ptr<gcs::internal::ObjectReadSource> result(mock_source);
return google::cloud::make_status_or(std::move(result));
});
EXPECT_TRUE(test_controller_.plan->setProperty(fetch_gcs_object_, FetchGCSObject::Bucket.getName(), "bucket-from-property"));
const auto& result = test_controller_.trigger("hello world", {{minifi_gcp::GCS_BUCKET_ATTR, "bucket-from-attribute"}});
EXPECT_EQ(0, result.at(FetchGCSObject::Success).size());
ASSERT_EQ(1, result.at(FetchGCSObject::Failure).size());
EXPECT_NE(std::nullopt, result.at(FetchGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
EXPECT_NE(std::nullopt, result.at(FetchGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_REASON));
}
TEST_F(FetchGCSObjectTests, HappyPath) {
std::string const text = "stored text";
std::size_t offset = 0;
// Simulate a Read() call in the MockObjectReadSource object created below
auto simulate_read = [&text, &offset](void* buf, std::size_t n) {
auto const l = (std::min)(n, text.size() - offset);
std::memcpy(buf, text.data() + offset, l);
offset += l;
return gcs::internal::ReadSourceResult{
l, gcs::internal::HttpResponse{200, {}, {}}};
};
EXPECT_CALL(*fetch_gcs_object_->mock_client_, ReadObject)
.WillOnce([&](gcs::internal::ReadObjectRangeRequest const& request) {
EXPECT_EQ(request.bucket_name(), "bucket-from-attribute") << request;
EXPECT_TRUE(request.HasOption<gcs::Generation>());
EXPECT_TRUE(request.GetOption<gcs::Generation>().has_value());
EXPECT_EQ(23, request.GetOption<gcs::Generation>().value());
std::unique_ptr<gcs::testing::MockObjectReadSource> mock_source(new gcs::testing::MockObjectReadSource);
::testing::InSequence seq;
EXPECT_CALL(*mock_source, IsOpen()).WillRepeatedly(testing::Return(true));
EXPECT_CALL(*mock_source, Read).WillOnce(simulate_read);
EXPECT_CALL(*mock_source, IsOpen()).WillRepeatedly(testing::Return(false));
return google::cloud::make_status_or(
std::unique_ptr<gcs::internal::ObjectReadSource>(
std::move(mock_source)));
});
EXPECT_TRUE(test_controller_.plan->setProperty(fetch_gcs_object_, FetchGCSObject::ObjectGeneration.getName(), "${gcs.generation}"));
const auto& result = test_controller_.trigger("hello world", {{minifi_gcp::GCS_BUCKET_ATTR, "bucket-from-attribute"}, {minifi_gcp::GCS_GENERATION, "23"}});
ASSERT_EQ(1, result.at(FetchGCSObject::Success).size());
EXPECT_EQ(0, result.at(FetchGCSObject::Failure).size());
EXPECT_EQ("stored text", test_controller_.plan->getContent(result.at(FetchGCSObject::Success)[0]));
}
TEST_F(FetchGCSObjectTests, EmptyGeneration) {
std::string const text = "stored text";
std::size_t offset = 0;
// Simulate a Read() call in the MockObjectReadSource object created below
auto simulate_read = [&text, &offset](void* buf, std::size_t n) {
auto const l = (std::min)(n, text.size() - offset);
std::memcpy(buf, text.data() + offset, l);
offset += l;
return gcs::internal::ReadSourceResult{
l, gcs::internal::HttpResponse{200, {}, {}}};
};
EXPECT_CALL(*fetch_gcs_object_->mock_client_, ReadObject)
.WillOnce([&](gcs::internal::ReadObjectRangeRequest const& request) {
EXPECT_EQ(request.bucket_name(), "bucket-from-attribute") << request;
EXPECT_FALSE(request.HasOption<gcs::Generation>());
std::unique_ptr<gcs::testing::MockObjectReadSource> mock_source(new gcs::testing::MockObjectReadSource);
::testing::InSequence seq;
EXPECT_CALL(*mock_source, IsOpen()).WillRepeatedly(testing::Return(true));
EXPECT_CALL(*mock_source, Read).WillOnce(simulate_read);
EXPECT_CALL(*mock_source, IsOpen()).WillRepeatedly(testing::Return(false));
return google::cloud::make_status_or(
std::unique_ptr<gcs::internal::ObjectReadSource>(
std::move(mock_source)));
});
EXPECT_TRUE(test_controller_.plan->setProperty(fetch_gcs_object_, FetchGCSObject::ObjectGeneration.getName(), "${gcs.generation}"));
const auto& result = test_controller_.trigger("hello world", {{minifi_gcp::GCS_BUCKET_ATTR, "bucket-from-attribute"}});
ASSERT_EQ(1, result.at(FetchGCSObject::Success).size());
EXPECT_EQ(0, result.at(FetchGCSObject::Failure).size());
EXPECT_EQ("stored text", test_controller_.plan->getContent(result.at(FetchGCSObject::Success)[0]));
}
TEST_F(FetchGCSObjectTests, InvalidGeneration) {
EXPECT_TRUE(test_controller_.plan->setProperty(fetch_gcs_object_, FetchGCSObject::ObjectGeneration.getName(), "${gcs.generation}"));
const auto& result = test_controller_.trigger("hello world", {{minifi_gcp::GCS_BUCKET_ATTR, "bucket-from-attribute"}, {minifi_gcp::GCS_GENERATION, "23 banana"}});
ASSERT_EQ(0, result.at(FetchGCSObject::Success).size());
EXPECT_EQ(1, result.at(FetchGCSObject::Failure).size());
EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(FetchGCSObject::Failure)[0]));
}