blob: a8355b50f753d9d937cbc7af9ef6072870a175ae [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "../processors/PutGCSObject.h"
#include "../controllerservices/GCPCredentialsControllerService.h"
#include "GCPAttributes.h"
#include "core/Resource.h"
#include "unit/SingleProcessorTestController.h"
#include "google/cloud/storage/testing/mock_client.h"
#include "google/cloud/storage/internal/object_metadata_parser.h"
#include "google/cloud/storage/retry_policy.h"
#include "google/cloud/storage/testing/canonical_errors.h"
#include "unit/ProcessorUtils.h"
namespace gcs = ::google::cloud::storage;
namespace minifi_gcp = org::apache::nifi::minifi::extensions::gcp;
using PutGCSObject = org::apache::nifi::minifi::extensions::gcp::PutGCSObject;
using GCPCredentialsControllerService = org::apache::nifi::minifi::extensions::gcp::GCPCredentialsControllerService;
using ResumableUploadRequest = gcs::internal::ResumableUploadRequest;
using QueryResumableUploadResponse = gcs::internal::QueryResumableUploadResponse;
using ::google::cloud::storage::testing::canonical_errors::TransientError;
using ::google::cloud::storage::testing::canonical_errors::PermanentError;
namespace {
class PutGCSObjectMocked : public PutGCSObject {
using org::apache::nifi::minifi::extensions::gcp::PutGCSObject::PutGCSObject;
public:
static constexpr const char* Description = "PutGCSObjectMocked";
gcs::Client getClient() const override {
return gcs::testing::UndecoratedClientFromMock(mock_client_);
}
std::shared_ptr<gcs::testing::MockClient> mock_client_ = std::make_shared<gcs::testing::MockClient>();
};
REGISTER_RESOURCE(PutGCSObjectMocked, Processor);
} // namespace
class PutGCSObjectTests : public ::testing::Test {
public:
void SetUp() override {
put_gcs_object_ = test_controller_.getProcessor();
gcp_credentials_node_ = test_controller_.plan->addController("GCPCredentialsControllerService", "gcp_credentials_controller_service");
test_controller_.plan->setProperty(gcp_credentials_node_,
GCPCredentialsControllerService::CredentialsLoc,
magic_enum::enum_name(minifi_gcp::CredentialsLocation::USE_ANONYMOUS_CREDENTIALS));
test_controller_.plan->setProperty(put_gcs_object_,
PutGCSObject::GCPCredentials,
"gcp_credentials_controller_service");
}
TypedProcessorWrapper<PutGCSObjectMocked> put_gcs_object_;
org::apache::nifi::minifi::test::SingleProcessorTestController test_controller_{minifi::test::utils::make_processor<PutGCSObjectMocked>("PutGCSObjectMocked")};
std::shared_ptr<minifi::core::controller::ControllerServiceNode> gcp_credentials_node_;
static auto return_upload_done(const ResumableUploadRequest& request) {
using ObjectMetadataParser = gcs::internal::ObjectMetadataParser;
nlohmann::json metadata_json;
metadata_json["name"] = request.object_name();
metadata_json["bucket"] = request.bucket_name();
metadata_json["size"] = 10;
if (request.HasOption<gcs::EncryptionKey>()) {
metadata_json["customerEncryption"]["encryptionAlgorithm"] = "AES256";
metadata_json["customerEncryption"]["keySha256"] = "zkeXIcAB56dkHp0z1023TQZ+mzm+fZ5JRVgmAQ3bEVE=";
}
return testing::Return(google::cloud::make_status_or(QueryResumableUploadResponse{absl::nullopt, *ObjectMetadataParser::FromJson(metadata_json)}));
}
};
TEST_F(PutGCSObjectTests, MissingBucket) {
EXPECT_CALL(*put_gcs_object_.get().mock_client_, CreateResumableUpload).Times(0);
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket, ""));
const auto& result = test_controller_.trigger("hello world");
EXPECT_EQ(0, result.at(PutGCSObject::Success).size());
ASSERT_EQ(1, result.at(PutGCSObject::Failure).size());
EXPECT_EQ(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
EXPECT_EQ(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_REASON));
EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Failure)[0]));
}
TEST_F(PutGCSObjectTests, BucketFromAttribute) {
EXPECT_CALL(*put_gcs_object_.get().mock_client_, CreateResumableUpload)
.WillOnce([this](const ResumableUploadRequest& request) {
EXPECT_EQ("bucket-from-attribute", request.bucket_name());
EXPECT_CALL(*put_gcs_object_.get().mock_client_, UploadChunk).WillOnce(return_upload_done(request));
return gcs::internal::CreateResumableUploadResponse{"test-only-upload-id"};
});
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket, "${gcs.bucket}"));
const auto& result = test_controller_.trigger("hello world", {{std::string(minifi_gcp::GCS_BUCKET_ATTR), "bucket-from-attribute"}});
ASSERT_EQ(1, result.at(PutGCSObject::Success).size());
EXPECT_EQ(0, result.at(PutGCSObject::Failure).size());
EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Success)[0]));
}
TEST_F(PutGCSObjectTests, ServerGivesTransientErrors) {
EXPECT_CALL(*put_gcs_object_.get().mock_client_, CreateResumableUpload).WillOnce(testing::Return(TransientError()));
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::NumberOfRetries, "2"));
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket, "bucket-from-property"));
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key, "object-name-from-property"));
const auto& result = test_controller_.trigger("hello world");
EXPECT_EQ(0, result.at(PutGCSObject::Success).size());
ASSERT_EQ(1, result.at(PutGCSObject::Failure).size());
EXPECT_NE(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
EXPECT_NE(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_REASON));
EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Failure)[0]));
}
TEST_F(PutGCSObjectTests, ServerGivesPermaError) {
EXPECT_CALL(*put_gcs_object_.get().mock_client_, CreateResumableUpload).WillOnce(testing::Return(PermanentError()));
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket, "bucket-from-property"));
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key, "object-name-from-property"));
const auto& result = test_controller_.trigger("hello world");
EXPECT_EQ(0, result.at(PutGCSObject::Success).size());
ASSERT_EQ(1, result.at(PutGCSObject::Failure).size());
EXPECT_NE(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
EXPECT_NE(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_REASON));
EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Failure)[0]));
}
TEST_F(PutGCSObjectTests, NonRequiredPropertiesAreMissing) {
EXPECT_CALL(*put_gcs_object_.get().mock_client_, CreateResumableUpload)
.WillOnce([this](const ResumableUploadRequest& request) {
EXPECT_FALSE(request.HasOption<gcs::MD5HashValue>());
EXPECT_FALSE(request.HasOption<gcs::Crc32cChecksumValue>());
EXPECT_FALSE(request.HasOption<gcs::PredefinedAcl>());
EXPECT_FALSE(request.HasOption<gcs::IfGenerationMatch>());
EXPECT_CALL(*put_gcs_object_.get().mock_client_, UploadChunk).WillOnce(return_upload_done(request));
return gcs::internal::CreateResumableUploadResponse{"test-only-upload-id"};
});
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket, "bucket-from-property"));
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key, "object-name-from-property"));
const auto& result = test_controller_.trigger("hello world");
EXPECT_EQ(1, result.at(PutGCSObject::Success).size());
EXPECT_EQ(0, result.at(PutGCSObject::Failure).size());
}
TEST_F(PutGCSObjectTests, Crc32cMD5LocationTest) {
EXPECT_CALL(*put_gcs_object_.get().mock_client_, CreateResumableUpload)
.WillOnce([this](const ResumableUploadRequest& request) {
EXPECT_TRUE(request.HasOption<gcs::Crc32cChecksumValue>());
EXPECT_EQ("yZRlqg==", request.GetOption<gcs::Crc32cChecksumValue>().value());
EXPECT_TRUE(request.HasOption<gcs::MD5HashValue>());
EXPECT_EQ("XrY7u+Ae7tCTyyK7j1rNww==", request.GetOption<gcs::MD5HashValue>().value());
EXPECT_CALL(*put_gcs_object_.get().mock_client_, UploadChunk).WillOnce(return_upload_done(request));
return gcs::internal::CreateResumableUploadResponse{"test-only-upload-id"};
});
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::MD5Hash, "${md5}"));
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Crc32cChecksum, "${crc32c}"));
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket, "bucket-from-property"));
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key, "object-name-from-property"));
const auto& result = test_controller_.trigger("hello world", {{"crc32c", "yZRlqg=="}, {"md5", "XrY7u+Ae7tCTyyK7j1rNww=="}});
EXPECT_EQ(1, result.at(PutGCSObject::Success).size());
EXPECT_EQ(0, result.at(PutGCSObject::Failure).size());
}
TEST_F(PutGCSObjectTests, DontOverwriteTest) {
EXPECT_CALL(*put_gcs_object_.get().mock_client_, CreateResumableUpload)
.WillOnce([this](const ResumableUploadRequest& request) {
EXPECT_TRUE(request.HasOption<gcs::IfGenerationMatch>());
EXPECT_CALL(*put_gcs_object_.get().mock_client_, UploadChunk).WillOnce(return_upload_done(request));
return gcs::internal::CreateResumableUploadResponse{"test-only-upload-id"};
});
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::OverwriteObject, "false"));
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket, "bucket-from-property"));
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key, "object-name-from-property"));
const auto& result = test_controller_.trigger("hello world", {{"crc32c", "yZRlqg=="}, {"md5", "XrY7u+Ae7tCTyyK7j1rNww=="}});
ASSERT_EQ(1, result.at(PutGCSObject::Success).size());
EXPECT_EQ(0, result.at(PutGCSObject::Failure).size());
EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Success)[0]));
}
TEST_F(PutGCSObjectTests, ValidServerSideEncryptionTest) {
EXPECT_CALL(*put_gcs_object_.get().mock_client_, CreateResumableUpload)
.WillOnce([this](const ResumableUploadRequest& request) {
EXPECT_TRUE(request.HasOption<gcs::EncryptionKey>());
EXPECT_CALL(*put_gcs_object_.get().mock_client_, UploadChunk).WillOnce(return_upload_done(request));
return gcs::internal::CreateResumableUploadResponse{"test-only-upload-id"};
});
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::EncryptionKey, "ZW5jcnlwdGlvbl9rZXk="));
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket, "bucket-from-property"));
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key, "object-name-from-property"));
const auto& result = test_controller_.trigger("hello world");
ASSERT_EQ(1, result.at(PutGCSObject::Success).size());
EXPECT_EQ(0, result.at(PutGCSObject::Failure).size());
EXPECT_NE(std::nullopt, result.at(PutGCSObject::Success)[0]->getAttribute(minifi_gcp::GCS_ENCRYPTION_SHA256_ATTR));
EXPECT_NE(std::nullopt, result.at(PutGCSObject::Success)[0]->getAttribute(minifi_gcp::GCS_ENCRYPTION_ALGORITHM_ATTR));
EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Success)[0]));
}
TEST_F(PutGCSObjectTests, InvalidServerSideEncryptionTest) {
EXPECT_CALL(*put_gcs_object_.get().mock_client_, CreateResumableUpload).Times(0);
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::EncryptionKey, "not_base64_key"));
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket, "bucket-from-property"));
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key, "object-name-from-property"));
EXPECT_THROW(test_controller_.trigger("hello world"), minifi::Exception);
}
TEST_F(PutGCSObjectTests, NoContentType) {
EXPECT_CALL(*put_gcs_object_.get().mock_client_, CreateResumableUpload)
.WillOnce([this](const ResumableUploadRequest& request) {
EXPECT_FALSE(request.HasOption<gcs::ContentType>());
EXPECT_CALL(*put_gcs_object_.get().mock_client_, UploadChunk).WillOnce(return_upload_done(request));
return gcs::internal::CreateResumableUploadResponse{"test-only-upload-id"};
});
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket, "bucket-from-property"));
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key, "object-name-from-property"));
const auto& result = test_controller_.trigger("hello world");
ASSERT_EQ(1, result.at(PutGCSObject::Success).size());
EXPECT_EQ(0, result.at(PutGCSObject::Failure).size());
EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Success)[0]));
}
TEST_F(PutGCSObjectTests, ContentTypeFromAttribute) {
EXPECT_CALL(*put_gcs_object_.get().mock_client_, CreateResumableUpload)
.WillOnce([this](const ResumableUploadRequest& request) {
EXPECT_TRUE(request.HasOption<gcs::ContentType>());
EXPECT_EQ("text/attribute", request.GetOption<gcs::ContentType>().value());
EXPECT_CALL(*put_gcs_object_.get().mock_client_, UploadChunk).WillOnce(return_upload_done(request));
return gcs::internal::CreateResumableUploadResponse{"test-only-upload-id"};
});
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket, "bucket-from-property"));
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key, "object-name-from-property"));
const auto& result = test_controller_.trigger("hello world", {{"mime.type", "text/attribute"}});
ASSERT_EQ(1, result.at(PutGCSObject::Success).size());
EXPECT_EQ(0, result.at(PutGCSObject::Failure).size());
EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Success)[0]));
}
TEST_F(PutGCSObjectTests, ObjectACLTest) {
EXPECT_CALL(*put_gcs_object_.get().mock_client_, CreateResumableUpload)
.WillOnce([this](const ResumableUploadRequest& request) {
EXPECT_TRUE(request.HasOption<gcs::PredefinedAcl>());
EXPECT_EQ(gcs::PredefinedAcl::AuthenticatedRead().value(), request.GetOption<gcs::PredefinedAcl>().value());
EXPECT_CALL(*put_gcs_object_.get().mock_client_, UploadChunk).WillOnce(return_upload_done(request));
return gcs::internal::CreateResumableUploadResponse{"test-only-upload-id"};
});
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket, "bucket-from-property"));
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key, "object-name-from-property"));
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::ObjectACL, magic_enum::enum_name(minifi_gcp::put_gcs_object::PredefinedAcl::AUTHENTICATED_READ)));
const auto& result = test_controller_.trigger("hello world");
ASSERT_EQ(1, result.at(PutGCSObject::Success).size());
EXPECT_EQ(0, result.at(PutGCSObject::Failure).size());
EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Success)[0]));
}
TEST_F(PutGCSObjectTests, PredefinedACLTests) {
EXPECT_EQ(magic_enum::enum_name(minifi_gcp::put_gcs_object::PredefinedAcl::AUTHENTICATED_READ), gcs::PredefinedAcl::AuthenticatedRead().value());
EXPECT_EQ(magic_enum::enum_name(minifi_gcp::put_gcs_object::PredefinedAcl::BUCKET_OWNER_FULL_CONTROL), gcs::PredefinedAcl::BucketOwnerFullControl().value());
EXPECT_EQ(magic_enum::enum_name(minifi_gcp::put_gcs_object::PredefinedAcl::BUCKET_OWNER_READ_ONLY), gcs::PredefinedAcl::BucketOwnerRead().value());
EXPECT_EQ(magic_enum::enum_name(minifi_gcp::put_gcs_object::PredefinedAcl::PRIVATE), gcs::PredefinedAcl::Private().value());
EXPECT_EQ(magic_enum::enum_name(minifi_gcp::put_gcs_object::PredefinedAcl::PROJECT_PRIVATE), gcs::PredefinedAcl::ProjectPrivate().value());
EXPECT_EQ(magic_enum::enum_name(minifi_gcp::put_gcs_object::PredefinedAcl::PUBLIC_READ_ONLY), gcs::PredefinedAcl::PublicRead().value());
EXPECT_EQ(magic_enum::enum_name(minifi_gcp::put_gcs_object::PredefinedAcl::PUBLIC_READ_WRITE), gcs::PredefinedAcl::PublicReadWrite().value());
}