blob: cc56a8af620fcc605e0b287924eec50367a264bf [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "arrow/filesystem/filesystem.h"
#include "arrow/filesystem/path_util.h"
#include "arrow/result.h"
#include "parquet/encryption/file_key_unwrapper.h"
#include "parquet/encryption/file_key_wrapper.h"
#include "parquet/encryption/key_material.h"
#include "parquet/encryption/key_toolkit.h"
#include "parquet/encryption/file_system_key_material_store.h"
#include "parquet/encryption/key_toolkit_internal.h"
namespace parquet::encryption {
std::shared_ptr<KmsClient> KeyToolkit::GetKmsClient(
const KmsConnectionConfig& kms_connection_config, double cache_entry_lifetime_ms) {
if (kms_client_factory_ == nullptr) {
throw ParquetException("No KmsClientFactory is registered.");
}
auto kms_client_per_kms_instance_cache = kms_client_cache_.GetOrCreateInternalCache(
kms_connection_config.key_access_token(), cache_entry_lifetime_ms);
return kms_client_per_kms_instance_cache->GetOrInsert(
kms_connection_config.kms_instance_id, [this, kms_connection_config]() {
return this->kms_client_factory_->CreateKmsClient(kms_connection_config);
});
}
void KeyToolkit::RotateMasterKeys(
const KmsConnectionConfig& kms_connection_config,
const std::string& parquet_file_path,
const std::shared_ptr<::arrow::fs::FileSystem>& file_system, bool double_wrapping,
double cache_lifetime_seconds) {
// If process wrote files with double-wrapped keys, clean KEK cache (since master keys
// are changing). Only once for each key rotation cycle; not for every file.
const auto now = internal::CurrentTimePoint();
auto lock = last_cache_clean_for_key_rotation_time_mutex_.Lock();
if (now > last_cache_clean_for_key_rotation_time_ +
std::chrono::duration<double>(kCacheCleanPeriodForKeyRotation)) {
kek_write_cache_per_token().Clear();
last_cache_clean_for_key_rotation_time_ = now;
}
lock.Unlock();
std::shared_ptr<FileKeyMaterialStore> key_material_store =
FileSystemKeyMaterialStore::Make(parquet_file_path, file_system, false);
// Unwrapper for decrypting encrypted keys
FileKeyUnwrapper file_key_unwrapper(this, kms_connection_config, cache_lifetime_seconds,
key_material_store);
// Create a temporary store to hold new key material during rotation,
// and wrapper that will write material to this store when getting key metadata.
std::shared_ptr<FileKeyMaterialStore> temp_key_material_store =
FileSystemKeyMaterialStore::Make(parquet_file_path, file_system, true);
FileKeyWrapper file_key_wrapper(this, kms_connection_config, temp_key_material_store,
cache_lifetime_seconds, double_wrapping);
std::vector<std::string> file_key_id_set = key_material_store->GetKeyIDSet();
// When writing new encryption material, we re-use the same key identifiers
// so that key metadata does not need to change.
// Start with footer key (to get KMS ID, URL, if needed).
// We can rely on the footer key using a standardised key identifier.
std::string footer_key_id_str = std::string(KeyMaterial::kFooterKeyIdInFile);
std::string key_material_string = key_material_store->GetKeyMaterial(footer_key_id_str);
KeyWithMasterId key =
file_key_unwrapper.GetDataEncryptionKey(KeyMaterial::Parse(key_material_string));
file_key_wrapper.GetEncryptionKeyMetadata(key.data_key(), key.master_id(), true,
std::string(KeyMaterial::kFooterKeyIdInFile));
// Rotate column keys
for (const auto& key_id_in_file : file_key_id_set) {
if (key_id_in_file == std::string(KeyMaterial::kFooterKeyIdInFile)) {
continue;
}
key_material_string = key_material_store->GetKeyMaterial(key_id_in_file);
KeyWithMasterId column_key =
file_key_unwrapper.GetDataEncryptionKey(KeyMaterial::Parse(key_material_string));
file_key_wrapper.GetEncryptionKeyMetadata(
column_key.data_key(), column_key.master_id(), false, key_id_in_file);
}
// Save material to the temporary store then move it to the original store location
temp_key_material_store->SaveMaterial();
key_material_store->RemoveMaterial();
temp_key_material_store->MoveMaterialTo(key_material_store);
}
// Flush any caches that are tied to the (compromised) access_token
void KeyToolkit::RemoveCacheEntriesForToken(const std::string& access_token) {
kms_client_cache_per_token().Remove(access_token);
kek_write_cache_per_token().Remove(access_token);
kek_read_cache_per_token().Remove(access_token);
}
void KeyToolkit::RemoveCacheEntriesForAllTokens() {
kms_client_cache_per_token().Clear();
kek_write_cache_per_token().Clear();
kek_read_cache_per_token().Clear();
}
} // namespace parquet::encryption