PARQUET-1939: Fix remote KMS client ambiguity (#841)
diff --git a/parquet-hadoop/README.md b/parquet-hadoop/README.md
index 87beeb9..2ad1e09 100644
--- a/parquet-hadoop/README.md
+++ b/parquet-hadoop/README.md
@@ -418,13 +418,6 @@
---
-**Property:** `parquet.encryption.wrap.locally`
-**Description:** Wrap keys locally - master keys are fetched from the KMS server and used to encrypt other keys (DEKs or KEKs).
-If `false` - key wrapping will be performed by a KMS server.
-**Default value:** `false`
-
----
-
**Property:** `parquet.encryption.key.material.store.internally`
**Description:** Store key material inside Parquet file footers; this mode doesn’t produce additional files.
If `false`, key material is stored in separate new files, created in the same folder - this mode enables key rotation for immutable Parquet files.
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/KeyToolkit.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/KeyToolkit.java
index 668d9db..ea8a026 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/KeyToolkit.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/KeyToolkit.java
@@ -69,11 +69,6 @@
*/
public static final String CACHE_LIFETIME_PROPERTY_NAME = "parquet.encryption.cache.lifetime.seconds";
/**
- * Wrap keys locally - master keys are fetched from the KMS server and used to encrypt other keys (DEKs or KEKs).
- * By default, false - key wrapping will be performed by a KMS server.
- */
- public static final String WRAP_LOCALLY_PROPERTY_NAME = "parquet.encryption.wrap.locally";
- /**
* Store key material inside Parquet file footers; this mode doesn’t produce additional files.
* By default, true. If set to false, key material is stored in separate files in the same folder,
* which enables key rotation for immutable Parquet files.
@@ -92,7 +87,6 @@
public static final boolean DOUBLE_WRAPPING_DEFAULT = true;
public static final long CACHE_LIFETIME_DEFAULT_SECONDS = 10 * 60; // 10 minutes
- public static final boolean WRAP_LOCALLY_DEFAULT = false;
public static final boolean KEY_MATERIAL_INTERNAL_DEFAULT = true;
public static final int DATA_KEY_LENGTH_DEFAULT = 128;
public static final int KEK_LENGTH_DEFAULT = 128;
@@ -216,10 +210,6 @@
throw new UnsupportedOperationException("Key rotation is not supported for internal key material");
}
- if (hadoopConfig.getBoolean(WRAP_LOCALLY_PROPERTY_NAME, WRAP_LOCALLY_DEFAULT)) {
- throw new UnsupportedOperationException("Key rotation is not supported for local key wrapping");
- }
-
// If process wrote files with double-wrapped keys, clean KEK cache (since master keys are changing).
// Only once for each key rotation cycle; not for every folder
long currentTime = System.currentTimeMillis();
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/RemoteKmsClient.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/LocalWrapKmsClient.java
similarity index 61%
rename from parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/RemoteKmsClient.java
rename to parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/LocalWrapKmsClient.java
index c8a7435..921a2db 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/RemoteKmsClient.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/LocalWrapKmsClient.java
@@ -33,18 +33,21 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import static org.apache.parquet.crypto.keytools.KeyToolkit.stringIsEmpty;
-
-public abstract class RemoteKmsClient implements KmsClient {
+/**
+ * Typically, KMS systems support in-server key wrapping. Their clients should implement KmsClient interface directly.
+ * An extension of the LocalWrapKmsClient class should used only in rare situations where in-server wrapping is not
+ * supported. The wrapping will be done locally then - the MEKs will be fetched from the KMS server via the
+ * getMasterKeyFromServer function, and used to encrypt a DEK or KEK inside the LocalWrapKmsClient code.
+ * Note: master key rotation is not supported with local wrapping.
+ */
+public abstract class LocalWrapKmsClient implements KmsClient {
public static final String LOCAL_WRAP_NO_KEY_VERSION = "NO_VERSION";
protected String kmsInstanceID;
protected String kmsInstanceURL;
protected String kmsToken;
- protected Boolean isWrapLocally;
protected Configuration hadoopConfiguration;
- protected boolean isDefaultToken;
// MasterKey cache: master keys per key ID (per KMS Client). For local wrapping only.
private ConcurrentMap<String, byte[]> masterKeyCache;
@@ -113,102 +116,52 @@
public void initialize(Configuration configuration, String kmsInstanceID, String kmsInstanceURL, String accessToken) {
this.kmsInstanceID = kmsInstanceID;
this.kmsInstanceURL = kmsInstanceURL;
-
- this.isWrapLocally = configuration.getBoolean(KeyToolkit.WRAP_LOCALLY_PROPERTY_NAME, KeyToolkit.WRAP_LOCALLY_DEFAULT);
- if (isWrapLocally) {
- masterKeyCache = new ConcurrentHashMap<>();
- }
-
+
+ masterKeyCache = new ConcurrentHashMap<>();
hadoopConfiguration = configuration;
kmsToken = accessToken;
- isDefaultToken = kmsToken.equals(KmsClient.KEY_ACCESS_TOKEN_DEFAULT);
-
initializeInternal();
}
@Override
public String wrapKey(byte[] key, String masterKeyIdentifier) throws KeyAccessDeniedException {
- if (isWrapLocally) {
- byte[] masterKey = masterKeyCache.computeIfAbsent(masterKeyIdentifier,
+ byte[] masterKey = masterKeyCache.computeIfAbsent(masterKeyIdentifier,
(k) -> getKeyFromServer(masterKeyIdentifier));
- byte[] AAD = masterKeyIdentifier.getBytes(StandardCharsets.UTF_8);
- String encryptedEncodedKey = KeyToolkit.encryptKeyLocally(key, masterKey, AAD);
- return LocalKeyWrap.createSerialized(encryptedEncodedKey);
- } else {
- refreshToken();
- return wrapKeyInServer(key, masterKeyIdentifier);
- }
+ byte[] AAD = masterKeyIdentifier.getBytes(StandardCharsets.UTF_8);
+ String encryptedEncodedKey = KeyToolkit.encryptKeyLocally(key, masterKey, AAD);
+ return LocalKeyWrap.createSerialized(encryptedEncodedKey);
}
@Override
public byte[] unwrapKey(String wrappedKey, String masterKeyIdentifier) throws KeyAccessDeniedException {
- if (isWrapLocally) {
- LocalKeyWrap keyWrap = LocalKeyWrap.parse(wrappedKey);
- String masterKeyVersion = keyWrap.getMasterKeyVersion();
- if (!LOCAL_WRAP_NO_KEY_VERSION.equals(masterKeyVersion)) {
- throw new ParquetCryptoRuntimeException("Master key versions are not supported for local wrapping: "
- + masterKeyVersion);
- }
- String encryptedEncodedKey = keyWrap.getEncryptedKey();
- byte[] masterKey = masterKeyCache.computeIfAbsent(masterKeyIdentifier,
- (k) -> getKeyFromServer(masterKeyIdentifier));
- byte[] AAD = masterKeyIdentifier.getBytes(StandardCharsets.UTF_8);
- return KeyToolkit.decryptKeyLocally(encryptedEncodedKey, masterKey, AAD);
- } else {
- refreshToken();
- return unwrapKeyInServer(wrappedKey, masterKeyIdentifier);
+ LocalKeyWrap keyWrap = LocalKeyWrap.parse(wrappedKey);
+ String masterKeyVersion = keyWrap.getMasterKeyVersion();
+ if (!LOCAL_WRAP_NO_KEY_VERSION.equals(masterKeyVersion)) {
+ throw new ParquetCryptoRuntimeException("Master key versions are not supported for local wrapping: "
+ + masterKeyVersion);
}
- }
-
- private void refreshToken() {
- if (isDefaultToken) {
- return;
- }
- kmsToken = hadoopConfiguration.getTrimmed(KeyToolkit.KEY_ACCESS_TOKEN_PROPERTY_NAME);
- if (stringIsEmpty(kmsToken)) {
- throw new ParquetCryptoRuntimeException("Empty token");
- }
+ String encryptedEncodedKey = keyWrap.getEncryptedKey();
+ byte[] masterKey = masterKeyCache.computeIfAbsent(masterKeyIdentifier,
+ (k) -> getKeyFromServer(masterKeyIdentifier));
+ byte[] AAD = masterKeyIdentifier.getBytes(StandardCharsets.UTF_8);
+ return KeyToolkit.decryptKeyLocally(encryptedEncodedKey, masterKey, AAD);
}
private byte[] getKeyFromServer(String keyIdentifier) {
- refreshToken();
- return getMasterKeyFromServer(keyIdentifier);
+ // refresh token
+ kmsToken = hadoopConfiguration.getTrimmed(KeyToolkit.KEY_ACCESS_TOKEN_PROPERTY_NAME);
+ byte[] key = getMasterKeyFromServer(keyIdentifier);
+ int keyLength = key.length;
+ if (!(16 == keyLength || 24 == keyLength || 32 == keyLength)) {
+ throw new ParquetCryptoRuntimeException( "Wrong length: "+ keyLength +
+ " of AES key: " + keyIdentifier);
+ }
+ return key;
}
/**
- * Wrap a key with the master key in the remote KMS server.
- *
- * If your KMS client code throws runtime exceptions related to access/permission problems
- * (such as Hadoop AccessControlException), catch them and throw the KeyAccessDeniedException.
- *
- * @param keyBytes: key bytes to be wrapped
- * @param masterKeyIdentifier: a string that uniquely identifies the master key in a KMS instance
- * @return wrappedKey: Encrypts key bytes with the master key, encodes the result and potentially adds a KMS-specific metadata.
- * @throws KeyAccessDeniedException unauthorized to encrypt with the given master key
- * @throws UnsupportedOperationException KMS does not support in-server wrapping
- */
- protected abstract String wrapKeyInServer(byte[] keyBytes, String masterKeyIdentifier)
- throws KeyAccessDeniedException, UnsupportedOperationException;
-
- /**
- * Unwrap a key with the master key in the remote KMS server.
- *
- * If your KMS client code throws runtime exceptions related to access/permission problems
- * (such as Hadoop AccessControlException), catch them and throw the KeyAccessDeniedException.
- *
- * @param wrappedKey String produced by wrapKey operation
- * @param masterKeyIdentifier: a string that uniquely identifies the master key in a KMS instance
- * @return key bytes
- * @throws KeyAccessDeniedException unauthorized to unwrap with the given master key
- * @throws UnsupportedOperationException KMS does not support in-server unwrapping
- */
- protected abstract byte[] unwrapKeyInServer(String wrappedKey, String masterKeyIdentifier)
- throws KeyAccessDeniedException, UnsupportedOperationException;
-
- /**
* Get master key from the remote KMS server.
- * Required only for local wrapping. No need to implement if KMS supports in-server wrapping/unwrapping.
*
* If your KMS client code throws runtime exceptions related to access/permission problems
* (such as Hadoop AccessControlException), catch them and throw the KeyAccessDeniedException.
@@ -216,14 +169,13 @@
* @param masterKeyIdentifier: a string that uniquely identifies the master key in a KMS instance
* @return master key bytes
* @throws KeyAccessDeniedException unauthorized to get the master key
- * @throws UnsupportedOperationException If not implemented, or KMS does not support key fetching
*/
protected abstract byte[] getMasterKeyFromServer(String masterKeyIdentifier)
- throws KeyAccessDeniedException, UnsupportedOperationException;
+ throws KeyAccessDeniedException;
/**
* Pass configuration with KMS-specific parameters.
*/
protected abstract void initializeInternal()
throws KeyAccessDeniedException;
-}
\ No newline at end of file
+}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/TestPropertiesDrivenEncryption.java b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/TestPropertiesDrivenEncryption.java
index a9c078b..dc9b005 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/TestPropertiesDrivenEncryption.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/TestPropertiesDrivenEncryption.java
@@ -24,6 +24,7 @@
import org.apache.parquet.crypto.keytools.KeyToolkit;
import org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory;
import org.apache.parquet.crypto.keytools.mocks.InMemoryKMS;
+import org.apache.parquet.crypto.keytools.mocks.LocalWrapInMemoryKMS;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetReader;
@@ -277,13 +278,16 @@
conf.set(EncryptionPropertiesFactory.CRYPTO_FACTORY_CLASS_PROPERTY_NAME,
PropertiesDrivenCryptoFactory.class.getName());
- conf.set(KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME, InMemoryKMS.class.getName());
+ if (test.isWrapLocally) {
+ conf.set(KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME, LocalWrapInMemoryKMS.class.getName());
+ } else {
+ conf.set(KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME, InMemoryKMS.class.getName());
+ }
conf.set(InMemoryKMS.KEY_LIST_PROPERTY_NAME, KEY_LIST);
conf.set(InMemoryKMS.NEW_KEY_LIST_PROPERTY_NAME, NEW_KEY_LIST);
conf.setBoolean(KeyToolkit.KEY_MATERIAL_INTERNAL_PROPERTY_NAME, test.isKeyMaterialInternalStorage);
conf.setBoolean(KeyToolkit.DOUBLE_WRAPPING_PROPERTY_NAME, test.isDoubleWrapping);
- conf.setBoolean(KeyToolkit.WRAP_LOCALLY_PROPERTY_NAME, test.isWrapLocally);
return conf;
}
@@ -402,6 +406,10 @@
private void testReadEncryptedParquetFiles(Path root, List<SingleRow> data, ExecutorService threadPool) throws IOException {
readFilesMultithreaded(root, data, threadPool, false/*keysRotated*/);
+ if (isWrapLocally) {
+ return; // key rotation is not supported with local key wrapping
+ }
+
LOG.info("--> Start master key rotation");
Configuration hadoopConfigForRotation =
EncryptionConfiguration.ENCRYPT_COLUMNS_AND_FOOTER.getHadoopConfiguration(this);
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/keytools/mocks/InMemoryKMS.java b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/keytools/mocks/InMemoryKMS.java
index 1aa1b14..e6ec17d 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/keytools/mocks/InMemoryKMS.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/keytools/mocks/InMemoryKMS.java
@@ -26,8 +26,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.crypto.KeyAccessDeniedException;
import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
-import org.apache.parquet.crypto.keytools.RemoteKmsClient;
import org.apache.parquet.crypto.keytools.KeyToolkit;
+import org.apache.parquet.crypto.keytools.KmsClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,7 +35,7 @@
* This is a mock class, built for testing only. Don't use it as an example of KmsClient implementation.
* (VaultClient is the sample implementation).
*/
-public class InMemoryKMS extends RemoteKmsClient {
+public class InMemoryKMS implements KmsClient {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryKMS.class);
public static final String KEY_LIST_PROPERTY_NAME = "parquet.encryption.key.list";
@@ -57,9 +57,9 @@
}
@Override
- protected synchronized void initializeInternal() {
+ public synchronized void initialize(Configuration configuration, String kmsInstanceID, String kmsInstanceURL, String accessToken) {
// Parse master keys
- String[] masterKeys = hadoopConfiguration.getTrimmedStrings(KEY_LIST_PROPERTY_NAME);
+ String[] masterKeys = configuration.getTrimmedStrings(KEY_LIST_PROPERTY_NAME);
if (null == masterKeys || masterKeys.length == 0) {
throw new ParquetCryptoRuntimeException("No encryption key list");
}
@@ -91,7 +91,7 @@
}
@Override
- protected synchronized String wrapKeyInServer(byte[] keyBytes, String masterKeyIdentifier)
+ public synchronized String wrapKey(byte[] keyBytes, String masterKeyIdentifier)
throws KeyAccessDeniedException, UnsupportedOperationException {
// Always use the latest key version for writing
@@ -104,7 +104,7 @@
}
@Override
- protected synchronized byte[] unwrapKeyInServer(String wrappedKey, String masterKeyIdentifier)
+ public synchronized byte[] unwrapKey(String wrappedKey, String masterKeyIdentifier)
throws KeyAccessDeniedException, UnsupportedOperationException {
byte[] masterKey = masterKeyMap.get(masterKeyIdentifier);
if (null == masterKey) {
@@ -113,11 +113,4 @@
byte[] AAD = masterKeyIdentifier.getBytes(StandardCharsets.UTF_8);
return KeyToolkit.decryptKeyLocally(wrappedKey, masterKey, AAD);
}
-
- @Override
- protected synchronized byte[] getMasterKeyFromServer(String masterKeyIdentifier)
- throws KeyAccessDeniedException, UnsupportedOperationException {
- // Always return the latest key version
- return newMasterKeyMap.get(masterKeyIdentifier);
- }
}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/keytools/mocks/LocalWrapInMemoryKMS.java b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/keytools/mocks/LocalWrapInMemoryKMS.java
new file mode 100644
index 0000000..cdf27fd
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/keytools/mocks/LocalWrapInMemoryKMS.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.crypto.keytools.mocks;
+
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.parquet.crypto.KeyAccessDeniedException;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.crypto.keytools.LocalWrapKmsClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a mock class, built for testing only. Don't use it as an example of KmsClient implementation.
+ * (VaultClient is the sample implementation).
+ */
+public class LocalWrapInMemoryKMS extends LocalWrapKmsClient {
+ private static final Logger LOG = LoggerFactory.getLogger(LocalWrapInMemoryKMS.class);
+
+ public static final String KEY_LIST_PROPERTY_NAME = "parquet.encryption.key.list";
+
+ private static Map<String,byte[]> masterKeyMap;
+
+ @Override
+ protected synchronized void initializeInternal() throws KeyAccessDeniedException {
+ // Parse master keys
+ String[] masterKeys = hadoopConfiguration.getTrimmedStrings(KEY_LIST_PROPERTY_NAME);
+ if (null == masterKeys || masterKeys.length == 0) {
+ throw new ParquetCryptoRuntimeException("No encryption key list");
+ }
+ masterKeyMap = parseKeyList(masterKeys);
+ }
+
+ private static Map<String, byte[]> parseKeyList(String[] masterKeys) {
+ Map<String,byte[]> keyMap = new HashMap<>();
+
+ int nKeys = masterKeys.length;
+ for (int i=0; i < nKeys; i++) {
+ String[] parts = masterKeys[i].split(":");
+ String keyName = parts[0].trim();
+ if (parts.length != 2) {
+ throw new IllegalArgumentException("Key '" + keyName + "' is not formatted correctly");
+ }
+ String key = parts[1].trim();
+ try {
+ byte[] keyBytes = Base64.getDecoder().decode(key);
+ keyMap.put(keyName, keyBytes);
+ } catch (IllegalArgumentException e) {
+ LOG.warn("Could not decode key '" + keyName + "'!");
+ throw e;
+ }
+ }
+ return keyMap;
+ }
+
+ @Override
+ protected synchronized byte[] getMasterKeyFromServer(String masterKeyIdentifier)
+ throws KeyAccessDeniedException, UnsupportedOperationException {
+ return masterKeyMap.get(masterKeyIdentifier);
+ }
+}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/keytools/samples/VaultClient.java b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/keytools/samples/VaultClient.java
index 1e8507a..9eb9c51 100755
--- a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/keytools/samples/VaultClient.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/keytools/samples/VaultClient.java
@@ -25,10 +25,11 @@
import okhttp3.RequestBody;
import okhttp3.Response;
+import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.crypto.KeyAccessDeniedException;
import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.crypto.keytools.KeyToolkit;
import org.apache.parquet.crypto.keytools.KmsClient;
-import org.apache.parquet.crypto.keytools.RemoteKmsClient;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,8 +43,9 @@
/**
* An example of KmsClient implementation. Not for production use!
*/
-public class VaultClient extends RemoteKmsClient {
+public class VaultClient implements KmsClient {
private static final Logger LOG = LoggerFactory.getLogger(VaultClient.class);
+
private static final MediaType JSON_MEDIA_TYPE = MediaType.get("application/json; charset=utf-8");
private static final String DEFAULT_TRANSIT_ENGINE = "/v1/transit/";
private static final String transitWrapEndpoint = "encrypt/";
@@ -51,16 +53,20 @@
private static final String tokenHeader="X-Vault-Token";
private static final ObjectMapper objectMapper = new ObjectMapper();
+ private String kmsToken;
+ private Configuration hadoopConfiguration;
+
private String endPointPrefix;
private OkHttpClient httpClient = new OkHttpClient.Builder()
.connectionSpecs(Arrays.asList(ConnectionSpec.MODERN_TLS, ConnectionSpec.COMPATIBLE_TLS))
.build();
@Override
- protected void initializeInternal() {
- if (isDefaultToken) {
- throw new ParquetCryptoRuntimeException("Vault token not provided");
- }
+ public void initialize(Configuration configuration, String kmsInstanceID, String kmsInstanceURL, String accessToken)
+ throws KeyAccessDeniedException {
+ hadoopConfiguration = configuration;
+ checkToken(accessToken);
+ kmsToken = accessToken;
if (kmsInstanceURL.equals(KmsClient.KMS_INSTANCE_URL_DEFAULT)) {
throw new ParquetCryptoRuntimeException("Vault URL not provided");
@@ -82,31 +88,31 @@
}
@Override
- public String wrapKeyInServer(byte[] dataKey, String masterKeyIdentifier) {
+ public String wrapKey(byte[] keyBytes, String masterKeyIdentifier)
+ throws KeyAccessDeniedException {
+ refreshToken();
Map<String, String> writeKeyMap = new HashMap<String, String>(1);
- final String dataKeyStr = Base64.getEncoder().encodeToString(dataKey);
+ final String dataKeyStr = Base64.getEncoder().encodeToString(keyBytes);
writeKeyMap.put("plaintext", dataKeyStr);
- String response = getContentFromTransitEngine(endPointPrefix + transitWrapEndpoint, buildPayload(writeKeyMap), masterKeyIdentifier);
+ String response = getContentFromTransitEngine(endPointPrefix + transitWrapEndpoint,
+ buildPayload(writeKeyMap), masterKeyIdentifier);
String ciphertext = parseReturn(response, "ciphertext");
return ciphertext;
}
@Override
- public byte[] unwrapKeyInServer(String wrappedKey, String masterKeyIdentifier) {
+ public byte[] unwrapKey(String wrappedKey, String masterKeyIdentifier)
+ throws KeyAccessDeniedException {
+ refreshToken();
Map<String, String> writeKeyMap = new HashMap<String, String>(1);
writeKeyMap.put("ciphertext", wrappedKey);
- String response = getContentFromTransitEngine(endPointPrefix + transitUnwrapEndpoint, buildPayload(writeKeyMap), masterKeyIdentifier);
+ String response = getContentFromTransitEngine(endPointPrefix + transitUnwrapEndpoint,
+ buildPayload(writeKeyMap), masterKeyIdentifier);
String plaintext = parseReturn(response, "plaintext");
final byte[] key = Base64.getDecoder().decode(plaintext);
return key;
}
- @Override
- protected byte[] getMasterKeyFromServer(String masterKeyIdentifier) {
- // Vault supports in-server wrapping and unwrapping. No need to fetch master keys.
- throw new UnsupportedOperationException("Use server wrap/unwrap, instead of fetching master keys (local wrap)");
- }
-
private String buildPayload(Map<String, String> paramMap) {
String jsonValue;
try {
@@ -117,6 +123,17 @@
return jsonValue;
}
+ private void checkToken(String token) {
+ if (null == token || token.isEmpty() || token.equals(KmsClient.KEY_ACCESS_TOKEN_DEFAULT)) {
+ throw new ParquetCryptoRuntimeException("Wrong Vault token : " + token);
+ }
+ }
+
+ private void refreshToken() {
+ kmsToken = hadoopConfiguration.getTrimmed(KeyToolkit.KEY_ACCESS_TOKEN_PROPERTY_NAME);
+ checkToken(kmsToken);
+ }
+
private String getContentFromTransitEngine(String endPoint, String jPayload, String masterKeyIdentifier) {
LOG.info("masterKeyIdentifier: " + masterKeyIdentifier);
@@ -151,7 +168,6 @@
}
}
-
private static String parseReturn(String response, String searchKey) {
String matchingValue;
try {