blob: 9e15f63e3c7670c1cb3fcd73159c1c24ad03b131 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <memory>
#include "unit/TestBase.h"
#include "unit/Catch.h"
#include "s3/MultipartUploadStateStorage.h"
#include "utils/Environment.h"
#include "MockS3RequestSender.h"
#include "controllers/VolatileMapStateStorage.h"
#include "controllers/keyvalue/KeyValueStateManager.h"
namespace org::apache::nifi::minifi::test {
class MultipartUploadStateStorageTestFixture {
public:
MultipartUploadStateStorageTestFixture() {
LogTestController::getInstance().setDebug<minifi::aws::s3::MultipartUploadStateStorage>();
state_storage_ = std::make_unique<minifi::controllers::VolatileMapStateStorage>("KeyValueStateStorage");
state_manager_ = std::make_unique<minifi::controllers::KeyValueStateManager>(minifi::utils::IdGenerator::getIdGenerator()->generate(), gsl::make_not_null(state_storage_.get()));
upload_storage_ = std::make_unique<minifi::aws::s3::MultipartUploadStateStorage>(gsl::make_not_null(state_manager_.get()));
}
protected:
std::unique_ptr<minifi::controllers::KeyValueStateStorage> state_storage_;
std::unique_ptr<core::StateManager> state_manager_;
std::unique_ptr<minifi::aws::s3::MultipartUploadStateStorage> upload_storage_;
};
TEST_CASE_METHOD(MultipartUploadStateStorageTestFixture, "Store and get current key state", "[s3StateStorage]") {
REQUIRE(upload_storage_->getState("test_bucket", "key") == std::nullopt);
minifi::aws::s3::MultipartUploadState state("id1", 50_MiB, 200_MiB, Aws::Utils::DateTime::Now());
state.uploaded_parts = 2;
state.uploaded_size = 100_MiB;
state.uploaded_etags = {"etag1", "etag2"};
upload_storage_->storeState("test_bucket", "key", state);
REQUIRE(*upload_storage_->getState("test_bucket", "key") == state);
}
TEST_CASE_METHOD(MultipartUploadStateStorageTestFixture, "Get key upload state from multiple keys and buckets", "[s3StateStorage]") {
minifi::aws::s3::MultipartUploadState state1("id1", 50_MiB, 200_MiB, Aws::Utils::DateTime::Now());
state1.uploaded_parts = 3;
state1.uploaded_size = 150_MiB;
state1.uploaded_etags = {"etag1", "etag2", "etag3"};
upload_storage_->storeState("old_bucket", "key1", state1);
minifi::aws::s3::MultipartUploadState state2("id2", 50_MiB, 200_MiB, Aws::Utils::DateTime::Now());
state2.uploaded_parts = 2;
state2.uploaded_size = 100_MiB;
state2.uploaded_etags = {"etag4", "etag5"};
upload_storage_->storeState("test_bucket", "key1", state2);
minifi::aws::s3::MultipartUploadState state3("id3", 50_MiB, 400_MiB, Aws::Utils::DateTime::Now());
state3.uploaded_parts = 4;
state3.uploaded_size = 200_MiB;
state3.uploaded_etags = {"etag6", "etag7", "etag9", "etag8"};
upload_storage_->storeState("test_bucket", "key2", state3);
minifi::aws::s3::MultipartUploadState state4("id2", 50_MiB, 200_MiB, Aws::Utils::DateTime::Now());
state2.uploaded_parts = 3;
state2.uploaded_size = 150_MiB;
state2.uploaded_etags = {"etag4", "etag5", "etag10"};
upload_storage_->storeState("test_bucket", "key1", state4);
REQUIRE(*upload_storage_->getState("test_bucket", "key1") == state4);
REQUIRE(*upload_storage_->getState("old_bucket", "key1") == state1);
REQUIRE(*upload_storage_->getState("test_bucket", "key2") == state3);
}
TEST_CASE_METHOD(MultipartUploadStateStorageTestFixture, "Remove state", "[s3StateStorage]") {
minifi::aws::s3::MultipartUploadState state1("id1", 50_MiB, 200_MiB, Aws::Utils::DateTime::Now());
state1.uploaded_parts = 3;
state1.uploaded_size = 150_MiB;
state1.uploaded_etags = {"etag1", "etag2", "etag3"};
upload_storage_->storeState("old_bucket", "key1", state1);
minifi::aws::s3::MultipartUploadState state2("id2", 50_MiB, 200_MiB, Aws::Utils::DateTime::Now());
state2.uploaded_parts = 2;
state2.uploaded_size = 100_MiB;
state2.uploaded_etags = {"etag4", "etag5"};
upload_storage_->storeState("test_bucket", "key1", state2);
minifi::aws::s3::MultipartUploadState state3("id3", 50_MiB, 400_MiB, Aws::Utils::DateTime::Now());
state3.uploaded_parts = 4;
state3.uploaded_size = 200_MiB;
state3.uploaded_etags = {"etag6", "etag7", "etag9", "etag8"};
upload_storage_->storeState("test_bucket", "key2", state3);
REQUIRE(*upload_storage_->getState("old_bucket", "key1") == state1);
REQUIRE(upload_storage_->getState("test_bucket", "key1") == state2);
REQUIRE(*upload_storage_->getState("test_bucket", "key2") == state3);
upload_storage_->removeState("test_bucket", "key1");
REQUIRE(*upload_storage_->getState("old_bucket", "key1") == state1);
REQUIRE(upload_storage_->getState("test_bucket", "key1") == std::nullopt);
REQUIRE(*upload_storage_->getState("test_bucket", "key2") == state3);
}
TEST_CASE_METHOD(MultipartUploadStateStorageTestFixture, "Remove aged off state", "[s3StateStorage]") {
using namespace std::literals::chrono_literals;
minifi::aws::s3::MultipartUploadState state1("id1", 50_MiB, 200_MiB, Aws::Utils::DateTime::Now() - 10min);
state1.uploaded_parts = 3;
state1.uploaded_size = 150_MiB;
state1.uploaded_etags = {"etag1", "etag2", "etag3"};
upload_storage_->storeState("test_bucket", "key1", state1);
minifi::aws::s3::MultipartUploadState state2("id2", 50_MiB, 200_MiB, Aws::Utils::DateTime::Now());
state2.uploaded_parts = 2;
state2.uploaded_size = 100_MiB;
state2.uploaded_etags = {"etag4", "etag5"};
upload_storage_->storeState("test_bucket", "key2", state2);
upload_storage_->removeAgedStates(10min);
CHECK(upload_storage_->getState("test_bucket", "key1") == std::nullopt);
CHECK(upload_storage_->getState("test_bucket", "key2") == state2);
}
} // namespace org::apache::nifi::minifi::test