blob: ea8a02615797f9aaf21fddb8dff616016303b41f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.crypto.keytools;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.crypto.AesGcmDecryptor;
import org.apache.parquet.crypto.AesGcmEncryptor;
import org.apache.parquet.crypto.AesMode;
import org.apache.parquet.crypto.KeyAccessDeniedException;
import org.apache.parquet.crypto.ModuleCipherFactory;
import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
import org.apache.parquet.hadoop.BadConfigurationException;
import org.apache.parquet.hadoop.util.ConfigurationUtil;
import org.apache.parquet.hadoop.util.HiddenFileFilter;
import java.io.IOException;
import java.util.Base64;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
public class KeyToolkit {
/**
* Class implementing the KmsClient interface.
* KMS stands for “key management service”.
*/
public static final String KMS_CLIENT_CLASS_PROPERTY_NAME = "parquet.encryption.kms.client.class";
/**
* ID of the KMS instance that will be used for encryption (if multiple KMS instances are available).
*/
public static final String KMS_INSTANCE_ID_PROPERTY_NAME = "parquet.encryption.kms.instance.id";
/**
* URL of the KMS instance.
*/
public static final String KMS_INSTANCE_URL_PROPERTY_NAME = "parquet.encryption.kms.instance.url";
/**
* Authorization token that will be passed to KMS.
*/
public static final String KEY_ACCESS_TOKEN_PROPERTY_NAME = "parquet.encryption.key.access.token";
/**
* Use double wrapping - where data encryption keys (DEKs) are encrypted with key encryption keys (KEKs),
* which in turn are encrypted with master keys.
* By default, true. If set to false, DEKs are directly encrypted with master keys, KEKs are not used.
*/
public static final String DOUBLE_WRAPPING_PROPERTY_NAME = "parquet.encryption.double.wrapping";
/**
* Lifetime of cached entities (key encryption keys, local wrapping keys, KMS client objects).
*/
public static final String CACHE_LIFETIME_PROPERTY_NAME = "parquet.encryption.cache.lifetime.seconds";
/**
* Store key material inside Parquet file footers; this mode doesn’t produce additional files.
* By default, true. If set to false, key material is stored in separate files in the same folder,
* which enables key rotation for immutable Parquet files.
*/
public static final String KEY_MATERIAL_INTERNAL_PROPERTY_NAME = "parquet.encryption.key.material.store.internally";
/**
* Length of data encryption keys (DEKs), randomly generated by parquet key management tools.
* Can be 128, 192 or 256 bits.
*/
public static final String DATA_KEY_LENGTH_PROPERTY_NAME = "parquet.encryption.data.key.length.bits";
/**
* Length of key encryption keys (KEKs), randomly generated by parquet key management tools.
* Can be 128, 192 or 256 bits.
*/
public static final String KEK_LENGTH_PROPERTY_NAME = "parquet.encryption.kek.length.bits";
public static final boolean DOUBLE_WRAPPING_DEFAULT = true;
public static final long CACHE_LIFETIME_DEFAULT_SECONDS = 10 * 60; // 10 minutes
public static final boolean KEY_MATERIAL_INTERNAL_DEFAULT = true;
public static final int DATA_KEY_LENGTH_DEFAULT = 128;
public static final int KEK_LENGTH_DEFAULT = 128;
private static long lastCacheCleanForKeyRotationTime = 0;
private static Object lastCacheCleanForKeyRotationTimeLock = new Object();
// KMS servers typically allow to run key rotation once in a few hours / a day.
// We clean KEK writer cache (if needed) after 1 hour
private static final int CACHE_CLEAN_PERIOD_FOR_KEY_ROTATION = 60 * 60 * 1000; // 1 hour
// KMS client two level cache: token -> KMSInstanceId -> KmsClient
static final TwoLevelCacheWithExpiration<KmsClient> KMS_CLIENT_CACHE_PER_TOKEN =
KmsClientCache.INSTANCE.getCache();
// KEK two level cache for wrapping: token -> MEK_ID -> KeyEncryptionKey
static final TwoLevelCacheWithExpiration<KeyEncryptionKey> KEK_WRITE_CACHE_PER_TOKEN =
KEKWriteCache.INSTANCE.getCache();
// KEK two level cache for unwrapping: token -> KEK_ID -> KEK bytes
static final TwoLevelCacheWithExpiration<byte[]> KEK_READ_CACHE_PER_TOKEN =
KEKReadCache.INSTANCE.getCache();
private enum KmsClientCache {
INSTANCE;
private final TwoLevelCacheWithExpiration<KmsClient> cache =
new TwoLevelCacheWithExpiration<>();
private TwoLevelCacheWithExpiration<KmsClient> getCache() {
return cache;
}
}
private enum KEKWriteCache {
INSTANCE;
private final TwoLevelCacheWithExpiration<KeyEncryptionKey> cache =
new TwoLevelCacheWithExpiration<>();
private TwoLevelCacheWithExpiration<KeyEncryptionKey> getCache() {
return cache;
}
}
private enum KEKReadCache {
INSTANCE;
private final TwoLevelCacheWithExpiration<byte[]> cache =
new TwoLevelCacheWithExpiration<>();
private TwoLevelCacheWithExpiration<byte[]> getCache() {
return cache;
}
}
static class KeyWithMasterID {
private final byte[] keyBytes;
private final String masterID ;
KeyWithMasterID(byte[] keyBytes, String masterID) {
this.keyBytes = keyBytes;
this.masterID = masterID;
}
byte[] getDataKey() {
return keyBytes;
}
String getMasterID() {
return masterID;
}
}
static class KeyEncryptionKey {
private final byte[] kekBytes;
private final byte[] kekID;
private String encodedKekID;
private final String encodedWrappedKEK;
KeyEncryptionKey(byte[] kekBytes, byte[] kekID, String encodedWrappedKEK) {
this.kekBytes = kekBytes;
this.kekID = kekID;
this.encodedWrappedKEK = encodedWrappedKEK;
}
byte[] getBytes() {
return kekBytes;
}
byte[] getID() {
return kekID;
}
String getEncodedID() {
if (null == encodedKekID) {
encodedKekID = Base64.getEncoder().encodeToString(kekID);
}
return encodedKekID;
}
String getEncodedWrappedKEK() {
return encodedWrappedKEK;
}
}
/**
* Key rotation. In the single wrapping mode, decrypts data keys with old master keys, then encrypts
* them with new master keys. In the double wrapping mode, decrypts KEKs (key encryption keys) with old
* master keys, generates new KEKs and encrypts them with new master keys.
* Works only if key material is not stored internally in file footers.
* Not supported in local key wrapping mode.
* Method can be run by multiple threads, but each thread must work on a different folder.
* @param folderPath parent path of Parquet files, whose keys will be rotated
* @param hadoopConfig Hadoop configuration
* @throws IOException I/O problems
* @throws ParquetCryptoRuntimeException General parquet encryption problems
* @throws KeyAccessDeniedException No access to master keys
* @throws UnsupportedOperationException Master key rotation not supported in the specific configuration
*/
public static void rotateMasterKeys(String folderPath, Configuration hadoopConfig)
throws IOException, ParquetCryptoRuntimeException, KeyAccessDeniedException, UnsupportedOperationException {
if (hadoopConfig.getBoolean(KEY_MATERIAL_INTERNAL_PROPERTY_NAME, KEY_MATERIAL_INTERNAL_DEFAULT)) {
throw new UnsupportedOperationException("Key rotation is not supported for internal key material");
}
// If process wrote files with double-wrapped keys, clean KEK cache (since master keys are changing).
// Only once for each key rotation cycle; not for every folder
long currentTime = System.currentTimeMillis();
synchronized (lastCacheCleanForKeyRotationTimeLock) {
if (currentTime - lastCacheCleanForKeyRotationTime > CACHE_CLEAN_PERIOD_FOR_KEY_ROTATION) {
KEK_WRITE_CACHE_PER_TOKEN.clear();
lastCacheCleanForKeyRotationTime = currentTime;
}
}
Path parentPath = new Path(folderPath);
FileSystem hadoopFileSystem = parentPath.getFileSystem(hadoopConfig);
if (!hadoopFileSystem.exists(parentPath) || !hadoopFileSystem.isDirectory(parentPath)) {
throw new ParquetCryptoRuntimeException("Couldn't rotate keys - folder doesn't exist or is not a directory: " + folderPath);
}
FileStatus[] parquetFilesInFolder = hadoopFileSystem.listStatus(parentPath, HiddenFileFilter.INSTANCE);
if (parquetFilesInFolder.length == 0) {
throw new ParquetCryptoRuntimeException("Couldn't rotate keys - no parquet files in folder " + folderPath);
}
for (FileStatus fs : parquetFilesInFolder) {
Path parquetFile = fs.getPath();
FileKeyMaterialStore keyMaterialStore = new HadoopFSKeyMaterialStore(hadoopFileSystem);
keyMaterialStore.initialize(parquetFile, hadoopConfig, false);
FileKeyUnwrapper fileKeyUnwrapper = new FileKeyUnwrapper(hadoopConfig, parquetFile, keyMaterialStore);
FileKeyMaterialStore tempKeyMaterialStore = new HadoopFSKeyMaterialStore(hadoopFileSystem);
tempKeyMaterialStore.initialize(parquetFile, hadoopConfig, true);
FileKeyWrapper fileKeyWrapper = new FileKeyWrapper(hadoopConfig, tempKeyMaterialStore);
Set<String> fileKeyIdSet = keyMaterialStore.getKeyIDSet();
// Start with footer key (to get KMS ID, URL, if needed)
String keyMaterialString = keyMaterialStore.getKeyMaterial(KeyMaterial.FOOTER_KEY_ID_IN_FILE);
KeyWithMasterID key = fileKeyUnwrapper.getDEKandMasterID(KeyMaterial.parse(keyMaterialString));
fileKeyWrapper.getEncryptionKeyMetadata(key.getDataKey(), key.getMasterID(), true,
KeyMaterial.FOOTER_KEY_ID_IN_FILE);
fileKeyIdSet.remove(KeyMaterial.FOOTER_KEY_ID_IN_FILE);
// Rotate column keys
for (String keyIdInFile : fileKeyIdSet) {
keyMaterialString = keyMaterialStore.getKeyMaterial(keyIdInFile);
key = fileKeyUnwrapper.getDEKandMasterID(KeyMaterial.parse(keyMaterialString));
fileKeyWrapper.getEncryptionKeyMetadata(key.getDataKey(), key.getMasterID(), false, keyIdInFile);
}
tempKeyMaterialStore.saveMaterial();
keyMaterialStore.removeMaterial();
tempKeyMaterialStore.moveMaterialTo(keyMaterialStore);
}
}
/**
* Flush any caches that are tied to the (compromised) accessToken
* @param accessToken access token
*/
public static void removeCacheEntriesForToken(String accessToken) {
KMS_CLIENT_CACHE_PER_TOKEN.removeCacheEntriesForToken(accessToken);
KEK_WRITE_CACHE_PER_TOKEN.removeCacheEntriesForToken(accessToken);
KEK_READ_CACHE_PER_TOKEN.removeCacheEntriesForToken(accessToken);
}
public static void removeCacheEntriesForAllTokens() {
KMS_CLIENT_CACHE_PER_TOKEN.clear();
KEK_WRITE_CACHE_PER_TOKEN.clear();
KEK_READ_CACHE_PER_TOKEN.clear();
}
/**
* Encrypts "key" with "masterKey", using AES-GCM and the "AAD"
* @param keyBytes the key to encrypt
* @param masterKeyBytes encryption key
* @param AAD additional authenticated data
* @return base64 encoded encrypted key
*/
public static String encryptKeyLocally(byte[] keyBytes, byte[] masterKeyBytes, byte[] AAD) {
AesGcmEncryptor keyEncryptor;
keyEncryptor = (AesGcmEncryptor) ModuleCipherFactory.getEncryptor(AesMode.GCM, masterKeyBytes);
byte[] encryptedKey = keyEncryptor.encrypt(false, keyBytes, AAD);
return Base64.getEncoder().encodeToString(encryptedKey);
}
/**
* Decrypts encrypted key with "masterKey", using AES-GCM and the "AAD"
* @param encodedEncryptedKey base64 encoded encrypted key
* @param masterKeyBytes encryption key
* @param AAD additional authenticated data
* @return decrypted key
*/
public static byte[] decryptKeyLocally(String encodedEncryptedKey, byte[] masterKeyBytes, byte[] AAD) {
byte[] encryptedKey = Base64.getDecoder().decode(encodedEncryptedKey);
AesGcmDecryptor keyDecryptor;
keyDecryptor = (AesGcmDecryptor) ModuleCipherFactory.getDecryptor(AesMode.GCM, masterKeyBytes);
return keyDecryptor.decrypt(encryptedKey, 0, encryptedKey.length, AAD);
}
static KmsClient getKmsClient(String kmsInstanceID, String kmsInstanceURL, Configuration configuration,
String accessToken, long cacheEntryLifetime) {
ConcurrentMap<String, KmsClient> kmsClientPerKmsInstanceCache =
KMS_CLIENT_CACHE_PER_TOKEN.getOrCreateInternalCache(accessToken, cacheEntryLifetime);
KmsClient kmsClient =
kmsClientPerKmsInstanceCache.computeIfAbsent(kmsInstanceID,
(k) -> createAndInitKmsClient(configuration, kmsInstanceID, kmsInstanceURL, accessToken));
return kmsClient;
}
private static KmsClient createAndInitKmsClient(Configuration configuration, String kmsInstanceID,
String kmsInstanceURL, String accessToken) {
Class<?> kmsClientClass = null;
KmsClient kmsClient = null;
try {
kmsClientClass = ConfigurationUtil.getClassFromConfig(configuration,
KMS_CLIENT_CLASS_PROPERTY_NAME, KmsClient.class);
if (null == kmsClientClass) {
throw new ParquetCryptoRuntimeException("Unspecified " + KMS_CLIENT_CLASS_PROPERTY_NAME);
}
kmsClient = (KmsClient)kmsClientClass.newInstance();
} catch (InstantiationException | IllegalAccessException | BadConfigurationException e) {
throw new ParquetCryptoRuntimeException("Could not instantiate KmsClient class: "
+ kmsClientClass, e);
}
kmsClient.initialize(configuration, kmsInstanceID, kmsInstanceURL, accessToken);
return kmsClient;
}
static String formatTokenForLog(String accessToken) {
int maxTokenDisplayLength = 5;
if (accessToken.length() <= maxTokenDisplayLength) {
return accessToken;
}
return accessToken.substring(accessToken.length() - maxTokenDisplayLength);
}
static boolean stringIsEmpty(String str) {
return (null == str) || str.isEmpty();
}
}