PARQUET-1373: Encryption key tools (#615)

* comments
* update key tools
* double wrap for minimizing KMS calls
* Add information about KMS instance ID to footer metadata. Then on file reading, KMS instance ID doesn't have to be provided in properties, but can be read from the metadata.
Add RemoteKmsClient abstract class to assist implementing KMSClients for remote KMSs, that are accessed using URL.
Make DoubleWrappedKeyManager inherit from WrappedKeyManager and make FileKeyManager an abstract class. Add a static factory method to FileKeyManager to initialize an appropriate KSMClient and Key manager.
KMS URL should be specified in properties either directly or in a list. KMS instance ID is either default, or should be specified in properties or read from footer metadata.
* major update - key rotation, crypto factory, etc
* Change caches of EnvelopeKeyManager and EnvelopeKeyRetriever to be per token.
KmsClient is per token and read/write KEK caches too.
Add default token value for InMemoryKMS, which has no tokens.
Use concurrentHashMap for caches with computeIfAbsent.
Add expiration using to the caches - both time-based and on-demand.
On expiration delete the per-token entries from caches.
Add method for cache invalidation per token.
Add abstract methods to be implemented by RemoteKmsClients.
* add in-memory KMS
* Change RemoteKmsClient exceptions to IOException
instead of the higher-level ParquetCryptoRuntimeException.
Change to constant names to uppercase.
* Add sample VaultClient.
* interface changes
* Add okHttp3 dependency for VaultClient sample.
* wrapping changes
* Use JSON serialization for key material.
* separate write and read path, update caching
* improved refactoring
* key rotation improvements
* Add TestPropertiesDrivenEncryption
* get and resfresh token for all KMS clients
* minor changes
* Use ConcurrentHashMap for caches
* caching and store updates
* Rename some encryption/decryption configurations and make the test parameterized
to test combinations of isKeyMaterialExternalStorage, isDoubleWrapping, isWrapLocally.
Add RemoteKmsClient mock for remote wrapping.
* add removeCacheEntriesForAllTokens
* Make common method setCommonKMSProperties and extract classname strings from classes
* Change TestPropertiesDrivenEncryption to accomodate latest API changes.
* Remove StringUtils
* address review comments
* key material documentation
* Boolean objects

Co-authored-by: Maya Anderson <mayaa@il.ibm.com>
diff --git a/parquet-hadoop/README.md b/parquet-hadoop/README.md
index 939a79a..457d326 100644
--- a/parquet-hadoop/README.md
+++ b/parquet-hadoop/README.md
@@ -239,6 +239,12 @@
 **Description:** Whether to write out page level checksums.  
 **Default value:** `true`
 
+---
+
+**Property:** `parquet.crypto.factory.class`  
+**Description:** Class implementing EncryptionPropertiesFactory.
+**Default value:** None. If not set, the file won't be encrypted by a crypto factory.
+
 ## Class: ParquetInputFormat
 
 **Property:** `parquet.read.support.class`  
@@ -341,3 +347,92 @@
 **Property:** `parquet.compression.codec.zstd.workers`  
 **Description:** The number of threads will be spawned to compress in parallel. More workers improve speed, but also increase memory usage. When it is 0, it works as single-threaded mode.  
 **Default value:** `0`
+
+## Class: HadoopReadOptions
+
+**Property:** `parquet.crypto.factory.class`  
+**Description:** Class implementing DecryptionPropertiesFactory.
+**Default value:** None. If not set, the file won't be decrypted by a crypto factory.
+
+## Class: PropertiesDrivenCryptoFactory
+
+**Property:** `parquet.encryption.column.keys`  
+**Description:** List of columns to encrypt, with master key IDs (see HIVE-21848).Format: “<masterKeyID>:<colName>,<colName>;<masterKeyID>:<colName>...”
+**Default value:** None. If neither `column.keys` nor `footer.key` are set, the file won't be encrypted by the PropertiesDrivenCryptoFactory. If one of the two properties is set, an exception will be thrown.
+
+---
+
+**Property:** `parquet.encryption.footer.key`  
+**Description:** Master key ID for footer encryption/signing.
+**Default value:** None. If neither `column.keys` nor `footer.key` are set, the file won't be encrypted by the PropertiesDrivenCryptoFactory. If one of the two properties is set, an exception will be thrown.
+
+---
+
+**Property:** `parquet.encryption.algorithm`  
+**Description:** Parquet encryption algorithm. Can be `AES_GCM_V1` or `AES_GCM_CTR_V1`.
+**Default value:** `AES_GCM_V1`
+
+---
+
+**Property:** `parquet.encryption.plaintext.footer`  
+**Description:** Write files in plaintext footer mode, that makes many footer fields visible (e.g. schema) but allows legacy readers to access unencrypted columns. The plaintext footer is signed with the footer key. 
+If `false`, write files in encrypted footer mode, that fully encrypts the footer, and signs it with the footer key.
+**Default value:** `false`
+
+---
+
+**Property:** `parquet.encryption.kms.client.class`  
+**Description:** Class implementing the KmsClient interface. "KMS" stands for “key management service”. The Client will interact with a KMS Server to wrap/unrwap encryption keys.
+**Default value:** None
+
+---
+
+**Property:** `parquet.encryption.kms.instance.id`  
+**Description:** ID of the KMS instance that will be used for encryption (if multiple KMS instances are available).
+**Default value:** `DEFAULT`
+
+---
+
+**Property:** `parquet.encryption.kms.instance.url`  
+**Description:** URL of the KMS instance.
+**Default value:** `DEFAULT`
+
+---
+
+**Property:** `parquet.encryption.key.access.token`  
+**Description:** Authorization token that will be passed to KMS.
+**Default value:** None
+
+---
+
+**Property:** `parquet.encryption.double.wrapping`  
+**Description:** Use double wrapping - where data encryption keys (DEKs) are encrypted with key encryption keys (KEKs), which in turn are encrypted with master keys. 
+If `false`, DEKs are directly encrypted with master keys, KEKs are not used.
+**Default value:** `true`
+
+---
+
+**Property:** `parquet.encryption.cache.lifetime.seconds`  
+**Description:** Lifetime of cached entities (key encryption keys, local wrapping keys, KMS client objects).
+**Default value:** `600` (10 minutes)
+
+---
+
+**Property:** `parquet.encryption.wrap.locally`  
+**Description:** Wrap keys locally - master keys are fetched from the KMS server and used to encrypt other keys (DEKs or KEKs).
+If `false` - key wrapping will be performed by a KMS server. 
+**Default value:** `false`
+
+---
+
+**Property:** `parquet.encryption.key.material.store.internally`  
+**Description:** Store key material inside Parquet file footers; this mode doesn’t produce additional files. 
+If `false`, key material is stored in separate new files, created in the same folder - this mode enables key rotation for immutable Parquet files.
+**Default value:** `true`
+
+---
+
+**Property:** `parquet.encryption.data.key.length.bits`  
+**Description:** Length of data encryption keys (DEKs), randomly generated by parquet key management tools. Can be 128, 192 or 256 bits.
+**Default value:** `128`
+
diff --git a/parquet-hadoop/pom.xml b/parquet-hadoop/pom.xml
index af2ac17..243aa1d 100644
--- a/parquet-hadoop/pom.xml
+++ b/parquet-hadoop/pom.xml
@@ -127,6 +127,12 @@
       <version>${slf4j.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.squareup.okhttp3</groupId>
+      <artifactId>okhttp</artifactId>
+      <version>4.6.0</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileDecryptor.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileDecryptor.java
index 683155d..ab1baa4 100755
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileDecryptor.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileDecryptor.java
@@ -22,10 +22,14 @@
 import org.apache.parquet.format.BlockCipher;
 import org.apache.parquet.format.EncryptionAlgorithm;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Arrays;
 import java.util.HashMap;
 
 public class InternalFileDecryptor {
+  private static final Logger LOG = LoggerFactory.getLogger(InternalFileDecryptor.class);
 
   private final FileDecryptionProperties fileDecryptionProperties;
   private final DecryptionKeyRetriever keyRetriever;
@@ -206,6 +210,10 @@
         throw new ParquetCryptoRuntimeException("Decryptor re-use: Different footer key metadata");
       }
     }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("File Decryptor. Algo: {}. Encrypted footer: {}", algorithm, encryptedFooter);
+    }
   }
 
   public InternalColumnDecryptionSetup setColumnCryptoMetadata(ColumnPath path, boolean encrypted, 
@@ -240,8 +248,11 @@
         }
         columnDecryptionSetup = new InternalColumnDecryptionSetup(path, true, true, 
             getDataModuleDecryptor(null), getThriftModuleDecryptor(null), columnOrdinal, null);
-      } else {
-        // Column is encrypted with column-specific key
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Column decryption (footer key): {}", path);
+        }
+      } else { // Column is encrypted with column-specific key
         byte[] columnKeyBytes = fileDecryptionProperties.getColumnKey(path);
         if ((null == columnKeyBytes) && (null != keyMetadata) && (null != keyRetriever)) {
           // No explicit column key given via API. Retrieve via key metadata.
@@ -251,12 +262,14 @@
             throw new KeyAccessDeniedException("Column " + path + ": key access denied", e);
           }
         }
-
-        if (null == columnKeyBytes) { // Hidden column: encrypted, but key unavailable
-          throw new ParquetCryptoRuntimeException("Column " + path + ": key unavailable");
-        } else { // Key is available
-          columnDecryptionSetup = new InternalColumnDecryptionSetup(path, true, false, 
+        if (null == columnKeyBytes) {
+          throw new ParquetCryptoRuntimeException("Column " + path + "is encrypted with NULL column key");
+        }
+        columnDecryptionSetup = new InternalColumnDecryptionSetup(path, true, false, 
               getDataModuleDecryptor(columnKeyBytes), getThriftModuleDecryptor(columnKeyBytes), columnOrdinal, keyMetadata);
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Column decryption (column key): {}", path);
         }
       }
     }
@@ -300,4 +313,3 @@
     return fileDecryptionProperties;
   }
 }
-
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileEncryptor.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileEncryptor.java
index c167a5e..d9619d1 100755
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileEncryptor.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileEncryptor.java
@@ -22,11 +22,15 @@
 import org.apache.parquet.format.BlockCipher;
 import org.apache.parquet.format.FileCryptoMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.parquet.format.EncryptionAlgorithm;
 
 import java.util.HashMap;
+import java.util.Map;
 
 public class InternalFileEncryptor {
+  private static final Logger LOG = LoggerFactory.getLogger(InternalFileEncryptor.class);
 
   private final EncryptionAlgorithm algorithm;
   private final FileEncryptionProperties fileEncryptionProperties;
@@ -42,6 +46,9 @@
 
   public InternalFileEncryptor(FileEncryptionProperties fileEncryptionProperties) {
     this.fileEncryptionProperties = fileEncryptionProperties;
+    if (LOG.isDebugEnabled()) {
+      fileEncryptorLog();
+    }
     algorithm = fileEncryptionProperties.getAlgorithm();
     footerKey = fileEncryptionProperties.getFooterKey();
     encryptFooter =  fileEncryptionProperties.encryptedFooter();
@@ -171,4 +178,19 @@
     }
     return (AesGcmEncryptor) ModuleCipherFactory.getEncryptor(AesMode.GCM, footerKey);
   }
+
+  private void fileEncryptorLog() {
+    String encryptedColumnList;
+    Map<ColumnPath, ColumnEncryptionProperties> columnPropertyMap = fileEncryptionProperties.getEncryptedColumns();
+    if (null != columnPropertyMap) {
+      encryptedColumnList = "";
+      for (Map.Entry<ColumnPath, ColumnEncryptionProperties> entry : columnPropertyMap.entrySet()) {
+        encryptedColumnList += entry.getKey() + "; ";
+      }
+    } else {
+      encryptedColumnList = "Every column will be encrypted with footer key.";
+    }
+    LOG.debug("File Encryptor. Algo: {}. Encrypted footer: {}.  Encrypted columns: {}", 
+        fileEncryptionProperties.getAlgorithm(), fileEncryptionProperties.encryptedFooter(), encryptedColumnList);
+  }
 }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/FileKeyMaterialStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/FileKeyMaterialStore.java
new file mode 100644
index 0000000..b9db944
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/FileKeyMaterialStore.java
@@ -0,0 +1,72 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.parquet.crypto.keytools;

+

+import java.util.Set;

+

+import org.apache.hadoop.conf.Configuration;

+import org.apache.hadoop.fs.Path;

+

+public interface FileKeyMaterialStore {

+

+  /**

+   * Initializes key material store for a parquet file.

+   * @param parquetFilePath Parquet file path

+   * @param hadoopConfig Hadoop configuration

+   * @param tempStore set true if this is a temporary store, used in key rotation

+   */

+  public void initialize(Path parquetFilePath, Configuration hadoopConfig, boolean tempStore);

+

+  /**

+   * Add key material for one encryption key.

+   * @param keyIDInFile ID of the key in Parquet file

+   * @param keyMaterial key material

+   */

+  public void addKeyMaterial(String keyIDInFile, String keyMaterial);

+  

+  /**

+   * After key material was added for all keys in the given Parquet file, 

+   * save material in persistent store.

+   */

+  public void saveMaterial();

+

+  /**

+   * Get key material

+   * @param keyIDInFile ID of a key in Parquet file

+   * @return key material

+   */

+  public String getKeyMaterial(String keyIDInFile);

+

+  /**

+   * @return Set of all key IDs in this store (for the given Parquet file) 

+   */

+  public Set<String> getKeyIDSet();

+

+  /**

+   * Remove key material from persistent store. Used in key rotation.

+   */

+  public void removeMaterial();

+

+  /**

+   * Move key material to another store. Used in key rotation.

+   * @param targetKeyMaterialStore target store

+   */

+  public void moveMaterialTo(FileKeyMaterialStore targetKeyMaterialStore);

+}
\ No newline at end of file
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/FileKeyUnwrapper.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/FileKeyUnwrapper.java
new file mode 100644
index 0000000..60bc77f
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/FileKeyUnwrapper.java
@@ -0,0 +1,173 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.parquet.crypto.keytools;

+

+import java.io.IOException;

+import java.util.Base64;

+import java.util.concurrent.ConcurrentMap;

+

+import org.apache.hadoop.conf.Configuration;

+import org.apache.hadoop.fs.Path;

+import org.apache.parquet.crypto.DecryptionKeyRetriever;

+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;

+import org.apache.parquet.crypto.keytools.KeyToolkit.KeyWithMasterID;

+

+import org.slf4j.Logger;

+import org.slf4j.LoggerFactory;

+

+import static org.apache.parquet.crypto.keytools.KeyToolkit.stringIsEmpty;

+import static org.apache.parquet.crypto.keytools.KeyToolkit.KMS_CLIENT_CACHE_PER_TOKEN;

+import static org.apache.parquet.crypto.keytools.KeyToolkit.KEK_READ_CACHE_PER_TOKEN;

+

+public class FileKeyUnwrapper implements DecryptionKeyRetriever {

+  private static final Logger LOG = LoggerFactory.getLogger(FileKeyUnwrapper.class);

+

+  //A map of KEK_ID -> KEK bytes, for the current token

+  private final ConcurrentMap<String,byte[]> kekPerKekID;

+

+  private KmsClient kmsClient = null;

+  private FileKeyMaterialStore keyMaterialStore = null;

+  private boolean checkedKeyMaterialInternalStorage = false;

+  private final Configuration hadoopConfiguration;

+  private final Path parquetFilePath;

+  private final String accessToken;

+  private final long cacheEntryLifetime;

+

+  FileKeyUnwrapper(Configuration hadoopConfiguration, Path filePath) {

+    this.hadoopConfiguration = hadoopConfiguration;

+    this.parquetFilePath = filePath;

+

+    cacheEntryLifetime = 1000L * hadoopConfiguration.getLong(KeyToolkit.CACHE_LIFETIME_PROPERTY_NAME,

+        KeyToolkit.CACHE_LIFETIME_DEFAULT_SECONDS);

+

+    accessToken = hadoopConfiguration.getTrimmed(KeyToolkit.KEY_ACCESS_TOKEN_PROPERTY_NAME, 

+        KmsClient.KEY_ACCESS_TOKEN_DEFAULT);

+

+    // Check cache upon each file reading (clean once in cacheEntryLifetime)

+    KMS_CLIENT_CACHE_PER_TOKEN.checkCacheForExpiredTokens(cacheEntryLifetime);

+    KEK_READ_CACHE_PER_TOKEN.checkCacheForExpiredTokens(cacheEntryLifetime);

+    kekPerKekID = KEK_READ_CACHE_PER_TOKEN.getOrCreateInternalCache(accessToken, cacheEntryLifetime);

+

+    if (LOG.isDebugEnabled()) {

+      LOG.debug("Creating file key unwrapper. KeyMaterialStore: {}; token snippet: {}", 

+          keyMaterialStore, KeyToolkit.formatTokenForLog(accessToken));

+    }

+  }

+

+  FileKeyUnwrapper(Configuration hadoopConfiguration, Path filePath, FileKeyMaterialStore keyMaterialStore) {

+    this(hadoopConfiguration, filePath);

+    this.keyMaterialStore = keyMaterialStore;

+    checkedKeyMaterialInternalStorage = true;

+  }

+

+  @Override

+  public byte[] getKey(byte[] keyMetadataBytes) {

+    KeyMetadata keyMetadata = KeyMetadata.parse(keyMetadataBytes);

+    

+    if (!checkedKeyMaterialInternalStorage) {

+      if (!keyMetadata.keyMaterialStoredInternally()) {

+        try {

+          keyMaterialStore = new HadoopFSKeyMaterialStore(parquetFilePath.getFileSystem(hadoopConfiguration));

+          keyMaterialStore.initialize(parquetFilePath, hadoopConfiguration, false);

+        } catch (IOException e) {

+          throw new ParquetCryptoRuntimeException("Failed to open key material store", e);

+        }

+      }

+      checkedKeyMaterialInternalStorage = true;

+    }

+

+    KeyMaterial keyMaterial;

+    if (keyMetadata.keyMaterialStoredInternally()) {

+      // Internal key material storage: key material is inside key metadata

+      keyMaterial = keyMetadata.getKeyMaterial();

+    } else {

+      // External key material storage: key metadata contains a reference to a key in the material store

+      String keyIDinFile = keyMetadata.getKeyReference();

+      String keyMaterialString = keyMaterialStore.getKeyMaterial(keyIDinFile);

+      if (null == keyMaterialString) {

+        throw new ParquetCryptoRuntimeException("Null key material for keyIDinFile: " + keyIDinFile);

+      }

+      keyMaterial = KeyMaterial.parse(keyMaterialString);

+    }

+

+    return getDEKandMasterID(keyMaterial).getDataKey();

+  }

+

+

+  KeyWithMasterID getDEKandMasterID(KeyMaterial keyMaterial)  {

+    if (null == kmsClient) {

+      kmsClient = getKmsClientFromConfigOrKeyMaterial(keyMaterial);

+    }

+

+    boolean doubleWrapping = keyMaterial.isDoubleWrapped();

+    String masterKeyID = keyMaterial.getMasterKeyID();

+    String encodedWrappedDEK = keyMaterial.getWrappedDEK();

+

+    byte[] dataKey;

+    if (!doubleWrapping) {

+      dataKey = kmsClient.unwrapKey(encodedWrappedDEK, masterKeyID);

+    } else {

+      // Get KEK

+      String encodedKekID = keyMaterial.getKekID();

+      String encodedWrappedKEK = keyMaterial.getWrappedKEK();

+      

+      byte[] kekBytes = kekPerKekID.computeIfAbsent(encodedKekID,

+          (k) -> kmsClient.unwrapKey(encodedWrappedKEK, masterKeyID));

+      

+      if (null == kekBytes) {

+        throw new ParquetCryptoRuntimeException("Null KEK, after unwrapping in KMS with master key " + masterKeyID);

+      }

+

+      // Decrypt the data key

+      byte[]  AAD = Base64.getDecoder().decode(encodedKekID);

+      dataKey =  KeyToolkit.decryptKeyLocally(encodedWrappedDEK, kekBytes, AAD);

+    }

+

+    return new KeyWithMasterID(dataKey, masterKeyID);

+  }

+

+  private KmsClient getKmsClientFromConfigOrKeyMaterial(KeyMaterial keyMaterial) {

+    String kmsInstanceID = hadoopConfiguration.getTrimmed(KeyToolkit.KMS_INSTANCE_ID_PROPERTY_NAME);

+    if (stringIsEmpty(kmsInstanceID)) {

+      kmsInstanceID = keyMaterial.getKmsInstanceID();

+      if (null == kmsInstanceID) {

+        throw new ParquetCryptoRuntimeException("KMS instance ID is missing both in properties and file key material");

+      }

+    }

+

+    String kmsInstanceURL = hadoopConfiguration.getTrimmed(KeyToolkit.KMS_INSTANCE_URL_PROPERTY_NAME);

+    if (stringIsEmpty(kmsInstanceURL)) {

+      kmsInstanceURL = keyMaterial.getKmsInstanceURL();

+      if (null == kmsInstanceURL) {

+        throw new ParquetCryptoRuntimeException("KMS instance URL is missing both in properties and file key material");

+      }

+    }

+

+    KmsClient kmsClient = KeyToolkit.getKmsClient(kmsInstanceID, kmsInstanceURL, hadoopConfiguration, accessToken, cacheEntryLifetime);

+    if (null == kmsClient) {

+      throw new ParquetCryptoRuntimeException("KMSClient was not successfully created for reading encrypted data.");

+    }

+

+    if (LOG.isDebugEnabled()) {

+      LOG.debug("File unwrapper - KmsClient: {}; InstanceId: {}; InstanceURL: {}", kmsClient, kmsInstanceID, kmsInstanceURL);

+    }

+    return kmsClient;

+  }

+}
\ No newline at end of file
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/FileKeyWrapper.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/FileKeyWrapper.java
new file mode 100644
index 0000000..4886019
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/FileKeyWrapper.java
@@ -0,0 +1,158 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.parquet.crypto.keytools;

+

+import java.nio.charset.StandardCharsets;

+import java.security.SecureRandom;

+import java.util.concurrent.ConcurrentMap;

+

+import org.apache.hadoop.conf.Configuration;

+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;

+import org.apache.parquet.crypto.keytools.KeyToolkit.KeyEncryptionKey;

+import org.slf4j.Logger;

+import org.slf4j.LoggerFactory;

+

+import static org.apache.parquet.crypto.keytools.KeyToolkit.KMS_CLIENT_CACHE_PER_TOKEN;

+import static org.apache.parquet.crypto.keytools.KeyToolkit.KEK_WRITE_CACHE_PER_TOKEN;

+

+public class FileKeyWrapper {

+  private static final Logger LOG = LoggerFactory.getLogger(FileKeyWrapper.class);

+

+  public static final int KEK_LENGTH = 16;

+  public static final int KEK_ID_LENGTH = 16;

+

+  // A map of MEK_ID -> KeyEncryptionKey, for the current token

+  private final ConcurrentMap<String, KeyEncryptionKey> KEKPerMasterKeyID;

+

+  private final long cacheEntryLifetime;

+  private final KmsClient kmsClient;

+  private final String kmsInstanceID;

+  private final String kmsInstanceURL;

+  private final FileKeyMaterialStore keyMaterialStore;

+  private final Configuration hadoopConfiguration;

+  private final SecureRandom random;

+  private final boolean doubleWrapping;

+

+  private short keyCounter;

+  private String accessToken;

+

+  FileKeyWrapper(Configuration configuration, FileKeyMaterialStore keyMaterialStore) {

+    this.hadoopConfiguration = configuration;

+    this.keyMaterialStore = keyMaterialStore;

+    

+    random = new SecureRandom();

+    keyCounter = 0;

+

+    cacheEntryLifetime = 1000L * hadoopConfiguration.getLong(KeyToolkit.CACHE_LIFETIME_PROPERTY_NAME, 

+        KeyToolkit.CACHE_LIFETIME_DEFAULT_SECONDS); 

+

+    kmsInstanceID = hadoopConfiguration.getTrimmed(KeyToolkit.KMS_INSTANCE_ID_PROPERTY_NAME, 

+        KmsClient.KMS_INSTANCE_ID_DEFAULT);

+

+    doubleWrapping =  hadoopConfiguration.getBoolean(KeyToolkit.DOUBLE_WRAPPING_PROPERTY_NAME, KeyToolkit.DOUBLE_WRAPPING_DEFAULT);

+    accessToken = hadoopConfiguration.getTrimmed(KeyToolkit.KEY_ACCESS_TOKEN_PROPERTY_NAME, KmsClient.KEY_ACCESS_TOKEN_DEFAULT);

+

+    kmsInstanceURL = hadoopConfiguration.getTrimmed(KeyToolkit.KMS_INSTANCE_URL_PROPERTY_NAME, 

+        KmsClient.KMS_INSTANCE_URL_DEFAULT);

+

+    // Check caches upon each file writing (clean once in cacheEntryLifetime)

+    KMS_CLIENT_CACHE_PER_TOKEN.checkCacheForExpiredTokens(cacheEntryLifetime);

+    kmsClient = KeyToolkit.getKmsClient(kmsInstanceID, kmsInstanceURL, configuration, accessToken, cacheEntryLifetime);

+

+    if (doubleWrapping) {

+      KEK_WRITE_CACHE_PER_TOKEN.checkCacheForExpiredTokens(cacheEntryLifetime);

+      KEKPerMasterKeyID = KEK_WRITE_CACHE_PER_TOKEN.getOrCreateInternalCache(accessToken, cacheEntryLifetime);

+    } else {

+      KEKPerMasterKeyID = null;

+    }

+

+    if (LOG.isDebugEnabled()) {

+      LOG.debug("Creating file key wrapper. KmsClient: {}; KmsInstanceId: {}; KmsInstanceURL: {}; doubleWrapping: {}; "

+          + "keyMaterialStore: {}; token snippet: {}", kmsClient, kmsInstanceID, kmsInstanceURL, doubleWrapping, 

+          keyMaterialStore, KeyToolkit.formatTokenForLog(accessToken));

+    }

+  }

+

+  byte[] getEncryptionKeyMetadata(byte[] dataKey, String masterKeyID, boolean isFooterKey) {

+    return getEncryptionKeyMetadata(dataKey, masterKeyID, isFooterKey, null);

+  }

+

+  byte[] getEncryptionKeyMetadata(byte[] dataKey, String masterKeyID, boolean isFooterKey, String keyIdInFile) {

+    if (null == kmsClient) {

+      throw new ParquetCryptoRuntimeException("No KMS client available. See previous errors.");

+    }

+

+    String encodedKekID = null;

+    String encodedWrappedKEK = null;

+    String encodedWrappedDEK = null;

+    if (!doubleWrapping) {

+      encodedWrappedDEK = kmsClient.wrapKey(dataKey, masterKeyID);

+    } else {

+      // Find in cache, or generate KEK for Master Key ID

+      KeyEncryptionKey keyEncryptionKey = KEKPerMasterKeyID.computeIfAbsent(masterKeyID,

+          (k) -> createKeyEncryptionKey(masterKeyID));

+

+      // Encrypt DEK with KEK

+      byte[] AAD = keyEncryptionKey.getID();

+      encodedWrappedDEK = KeyToolkit.encryptKeyLocally(dataKey, keyEncryptionKey.getBytes(), AAD);

+      encodedKekID = keyEncryptionKey.getEncodedID();

+      encodedWrappedKEK = keyEncryptionKey.getEncodedWrappedKEK();

+    }

+    

+    boolean storeKeyMaterialInternally = (null == keyMaterialStore);

+

+    String serializedKeyMaterial = KeyMaterial.createSerialized(isFooterKey, kmsInstanceID, kmsInstanceURL, masterKeyID, 

+        doubleWrapping, encodedKekID, encodedWrappedKEK, encodedWrappedDEK, storeKeyMaterialInternally);

+

+    // Internal key material storage: key metadata and key material are the same

+    if (storeKeyMaterialInternally) {

+      return serializedKeyMaterial.getBytes(StandardCharsets.UTF_8);

+    } 

+

+    // External key material storage: key metadata is a reference to a key in the material store

+    if (null == keyIdInFile) {

+      if (isFooterKey) {

+        keyIdInFile = KeyMaterial.FOOTER_KEY_ID_IN_FILE;

+      } else {

+        keyIdInFile = KeyMaterial.COLUMN_KEY_ID_IN_FILE_PREFIX + keyCounter;

+        keyCounter++;

+      }

+    }

+    keyMaterialStore.addKeyMaterial(keyIdInFile, serializedKeyMaterial);

+    

+    String serializedKeyMetadata = KeyMetadata.createSerializedForExternalMaterial(keyIdInFile);

+

+    return serializedKeyMetadata.getBytes(StandardCharsets.UTF_8);

+  }

+

+  private KeyEncryptionKey createKeyEncryptionKey(String masterKeyID) {

+    byte[] kekBytes = new byte[KEK_LENGTH]; 

+    random.nextBytes(kekBytes);

+

+    byte[] kekID = new byte[KEK_ID_LENGTH];

+    random.nextBytes(kekID);

+

+    // Encrypt KEK with Master key

+    String encodedWrappedKEK = null;

+    encodedWrappedKEK = kmsClient.wrapKey(kekBytes, masterKeyID);

+

+    return new KeyEncryptionKey(kekBytes, kekID, encodedWrappedKEK);

+  }

+}
\ No newline at end of file
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/HadoopFSKeyMaterialStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/HadoopFSKeyMaterialStore.java
new file mode 100644
index 0000000..fae700a
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/HadoopFSKeyMaterialStore.java
@@ -0,0 +1,133 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+package org.apache.parquet.crypto.keytools;

+

+import org.apache.hadoop.conf.Configuration;

+import org.apache.hadoop.fs.FSDataInputStream;

+import org.apache.hadoop.fs.FSDataOutputStream;

+import org.apache.hadoop.fs.FileSystem;

+import org.apache.hadoop.fs.Path;

+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;

+import org.codehaus.jackson.JsonNode;

+import org.codehaus.jackson.map.ObjectMapper;

+import org.codehaus.jackson.type.TypeReference;

+

+import java.io.IOException;

+import java.util.HashMap;

+import java.util.Map;

+import java.util.Set;

+

+public class HadoopFSKeyMaterialStore implements FileKeyMaterialStore {

+  

+  public final static String KEY_MATERIAL_FILE_PREFIX = "_KEY_MATERIAL_FOR_";

+  public static final String TEMP_FILE_PREFIX = "_TMP";

+  public final static String KEY_MATERIAL_FILE_SUFFFIX = ".json";

+  private static final ObjectMapper objectMapper = new ObjectMapper();

+

+  private FileSystem hadoopFileSystem;

+  private Map<String, String> keyMaterialMap;

+  private Path keyMaterialFile;

+

+  HadoopFSKeyMaterialStore(FileSystem hadoopFileSystem) {

+    this.hadoopFileSystem = hadoopFileSystem;

+  }

+

+  @Override

+  public void initialize(Path parquetFilePath, Configuration hadoopConfig, boolean tempStore) {

+    String fullPrefix = (tempStore? TEMP_FILE_PREFIX : "");

+    fullPrefix += KEY_MATERIAL_FILE_PREFIX;

+    keyMaterialFile = new Path(parquetFilePath.getParent(),

+      fullPrefix + parquetFilePath.getName() + KEY_MATERIAL_FILE_SUFFFIX);

+  }

+

+  @Override

+  public void addKeyMaterial(String keyIDInFile, String keyMaterial) throws ParquetCryptoRuntimeException {

+    if (null == keyMaterialMap) {

+      keyMaterialMap = new HashMap<>();

+    }

+    keyMaterialMap.put(keyIDInFile, keyMaterial);

+  }

+

+  @Override

+  public String getKeyMaterial(String keyIDInFile)  throws ParquetCryptoRuntimeException {

+    if (null == keyMaterialMap) {

+      loadKeyMaterialMap();

+    }

+    return keyMaterialMap.get(keyIDInFile);

+  }

+

+  private void loadKeyMaterialMap() {

+    try (FSDataInputStream keyMaterialStream = hadoopFileSystem.open(keyMaterialFile)) {

+      JsonNode keyMaterialJson = objectMapper.readTree(keyMaterialStream);

+      keyMaterialMap = objectMapper.readValue(keyMaterialJson,

+        new TypeReference<Map<String, String>>() { });

+    } catch (IOException e) {

+      throw new ParquetCryptoRuntimeException("Failed to get key material from " + keyMaterialFile, e);

+    }

+  }

+

+  @Override

+  public void saveMaterial() throws ParquetCryptoRuntimeException {

+    try (FSDataOutputStream keyMaterialStream = hadoopFileSystem.create(keyMaterialFile)) {

+      objectMapper.writeValue(keyMaterialStream, keyMaterialMap);

+    } catch (IOException e) {

+      throw new ParquetCryptoRuntimeException("Failed to save key material in " + keyMaterialFile, e);

+    }

+  }

+

+  @Override

+  public Set<String> getKeyIDSet() throws ParquetCryptoRuntimeException {

+    if (null == keyMaterialMap) {

+      loadKeyMaterialMap();

+    }

+

+    return keyMaterialMap.keySet();

+  }

+

+  @Override

+  public void removeMaterial() throws ParquetCryptoRuntimeException {

+    try {

+      hadoopFileSystem.delete(keyMaterialFile, false);

+    } catch (IOException e) {

+      throw new ParquetCryptoRuntimeException("Failed to delete file " + keyMaterialFile, e);

+    }

+  }

+

+  @Override

+  public void moveMaterialTo(FileKeyMaterialStore keyMaterialStore) throws ParquetCryptoRuntimeException {

+    // Currently supports only moving to a HadoopFSKeyMaterialStore

+    HadoopFSKeyMaterialStore targetStore;

+    try {

+      targetStore = (HadoopFSKeyMaterialStore) keyMaterialStore;

+    } catch (ClassCastException e) {

+      throw new IllegalArgumentException("Currently supports only moving to HadoopFSKeyMaterialStore, not to " +

+          keyMaterialStore.getClass(), e);

+    }

+    Path targetKeyMaterialFile = targetStore.getStorageFilePath();

+    try {

+      hadoopFileSystem.rename(keyMaterialFile, targetKeyMaterialFile);

+    } catch (IOException e) {

+      throw new ParquetCryptoRuntimeException("Failed to rename file " + keyMaterialFile, e);

+    }

+  }

+

+  private Path getStorageFilePath() {

+    return keyMaterialFile;

+  }

+}
\ No newline at end of file
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/KeyMaterial.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/KeyMaterial.java
new file mode 100644
index 0000000..3d49ff4
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/KeyMaterial.java
@@ -0,0 +1,210 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.parquet.crypto.keytools;

+

+import java.io.IOException;

+import java.io.StringReader;

+import java.util.HashMap;

+import java.util.Map;

+

+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;

+import org.codehaus.jackson.map.ObjectMapper;

+import org.codehaus.jackson.type.TypeReference;

+

+/**

+ * KeyMaterial class represents the "key material", keeping the information that allows readers to recover an encryption key (see 

+ * description of the KeyMetadata class). The keytools package (PARQUET-1373) implements the "envelope encryption" pattern, in a 

+ * "single wrapping" or "double wrapping" mode. In the single wrapping mode, the key material is generated by encrypting the 

+ * "data encryption key" (DEK) by a "master key". In the double wrapping mode, the key material is generated by encrypting the DEK 

+ * by a "key encryption key" (KEK), that in turn is encrypted by a "master key".

+ * 

+ * Key material is kept in a flat json object, with the following fields:

+ * 1. "keyMaterialType" - a String, with the type of  key material. In the current version, only one value is allowed - "PKMT1" (stands 

+ *     for "parquet key management tools, version 1"). For external key material storage, this field is written in both "key metadata" and 

+ *     "key material" jsons. For internal key material storage, this field is written only once in the common json.

+ * 2. "isFooterKey" - a boolean. If true, means that the material belongs to a file footer key, and keeps additional information (such as

+ *     KMS instance ID and URL). If false, means that the material belongs to a column key.

+ * 3. "kmsInstanceID" - a String, with the KMS Instance ID. Written only in footer key material.

+ * 4. "kmsInstanceURL" - a String, with the KMS Instance URL. Written only in footer key material.

+ * 5. "masterKeyID" - a String, with the ID of the master key used to generate the material.

+ * 6. "wrappedDEK" - a String, with the wrapped DEK (base64 encoding).

+ * 7. "doubleWrapping" - a boolean. If true, means that the material was generated in double wrapping mode. 

+ *     If false - in single wrapping mode.

+ * 8. "keyEncryptionKeyID" - a String, with the ID of the KEK used to generate the material. Written only in double wrapping mode.

+ * 9. "wrappedKEK" - a String, with the wrapped KEK (base64 encoding). Written only in double wrapping mode.

+ */

+public class KeyMaterial {

+  static final String KEY_MATERIAL_TYPE_FIELD = "keyMaterialType";

+  static final String KEY_MATERIAL_TYPE1 = "PKMT1";

+

+  static final String FOOTER_KEY_ID_IN_FILE = "footerKey";

+  static final String COLUMN_KEY_ID_IN_FILE_PREFIX = "columnKey";

+

+  private static final String IS_FOOTER_KEY_FIELD = "isFooterKey";

+  private static final String DOUBLE_WRAPPING_FIELD = "doubleWrapping";

+  private static final String KMS_INSTANCE_ID_FIELD = "kmsInstanceID";

+  private static final String KMS_INSTANCE_URL_FIELD = "kmsInstanceURL";

+  private static final String MASTER_KEY_ID_FIELD = "masterKeyID";

+  private static final String WRAPPED_DEK_FIELD = "wrappedDEK";

+  private static final String KEK_ID_FIELD = "keyEncryptionKeyID";

+  private static final String WRAPPED_KEK_FIELD = "wrappedKEK";

+

+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

+

+  private final boolean isFooterKey;

+  private final String kmsInstanceID;

+  private final String kmsInstanceURL;

+  private final String masterKeyID;

+  private final boolean isDoubleWrapped;

+  private final String kekID;

+  private final String encodedWrappedKEK;

+  private final String encodedWrappedDEK;

+

+  private KeyMaterial(boolean isFooterKey, String kmsInstanceID, String kmsInstanceURL, String masterKeyID, 

+      boolean isDoubleWrapped, String kekID, String encodedWrappedKEK, String encodedWrappedDEK) {

+    this.isFooterKey = isFooterKey;

+    this.kmsInstanceID = kmsInstanceID;

+    this.kmsInstanceURL = kmsInstanceURL;

+    this.masterKeyID = masterKeyID;

+    this.isDoubleWrapped = isDoubleWrapped;

+    this.kekID = kekID;

+    this.encodedWrappedKEK = encodedWrappedKEK;

+    this.encodedWrappedDEK = encodedWrappedDEK;

+  }

+

+  // parses external key material

+  static KeyMaterial parse(String keyMaterialString) {

+    Map<String, Object> keyMaterialJson = null;

+    try {

+      keyMaterialJson = OBJECT_MAPPER.readValue(new StringReader(keyMaterialString),

+          new TypeReference<Map<String, Object>>() {});

+    } catch (IOException e) {

+      throw new ParquetCryptoRuntimeException("Failed to parse key metadata " + keyMaterialString, e);

+    }

+    // 1. External key material - extract "key material type", and make sure it is supported

+    String keyMaterialType = (String) keyMaterialJson.get(KEY_MATERIAL_TYPE_FIELD);

+    if (!KEY_MATERIAL_TYPE1.equals(keyMaterialType)) {

+      throw new ParquetCryptoRuntimeException("Wrong key material type: " + keyMaterialType + 

+          " vs " + KEY_MATERIAL_TYPE1);

+    }

+    // Parse other fields (common to internal and external key material)

+    return parse(keyMaterialJson);

+  }

+

+  // parses fields common to internal and external key material

+  static KeyMaterial parse(Map<String, Object> keyMaterialJson) {

+    // 2. Check if "key material" belongs to file footer key

+    Boolean isFooterKey = (Boolean) keyMaterialJson.get(IS_FOOTER_KEY_FIELD);

+    String kmsInstanceID = null;

+    String kmsInstanceURL = null;

+    if (isFooterKey) {

+      // 3.  For footer key, extract KMS Instance ID

+      kmsInstanceID = (String) keyMaterialJson.get(KMS_INSTANCE_ID_FIELD);

+      // 4.  For footer key, extract KMS Instance URL

+      kmsInstanceURL = (String) keyMaterialJson.get(KMS_INSTANCE_URL_FIELD);

+    }

+    // 5. Extract master key ID

+    String masterKeyID = (String) keyMaterialJson.get(MASTER_KEY_ID_FIELD);

+    // 6. Extract wrapped DEK

+    String  encodedWrappedDEK = (String) keyMaterialJson.get(WRAPPED_DEK_FIELD);

+    String kekID = null;

+    String encodedWrappedKEK = null;

+    // 7. Check if "key material" was generated in double wrapping mode

+    Boolean isDoubleWrapped = (Boolean) keyMaterialJson.get(DOUBLE_WRAPPING_FIELD);

+    if (isDoubleWrapped) {

+      // 8. In double wrapping mode, extract KEK ID

+      kekID = (String) keyMaterialJson.get(KEK_ID_FIELD);

+      // 9. In double wrapping mode, extract wrapped KEK

+      encodedWrappedKEK = (String) keyMaterialJson.get(WRAPPED_KEK_FIELD);

+    }

+

+    return new KeyMaterial(isFooterKey, kmsInstanceID, kmsInstanceURL, masterKeyID, isDoubleWrapped, kekID, encodedWrappedKEK, encodedWrappedDEK);

+  }

+

+  static String createSerialized(boolean isFooterKey, String kmsInstanceID, String kmsInstanceURL, String masterKeyID, 

+      boolean isDoubleWrapped, String kekID, String encodedWrappedKEK, String encodedWrappedDEK, boolean isInternalStorage) {

+    Map<String, Object> keyMaterialMap = new HashMap<String, Object>(10);

+    // 1. Write "key material type"

+    keyMaterialMap.put(KEY_MATERIAL_TYPE_FIELD, KEY_MATERIAL_TYPE1);

+    if (isInternalStorage) {

+      // for internal storage, key material and key metadata are the same.

+      // adding the "internalStorage" field that belongs to KeyMetadata.

+      keyMaterialMap.put(KeyMetadata.KEY_MATERIAL_INTERNAL_STORAGE_FIELD, Boolean.TRUE);

+    }

+    // 2. Write isFooterKey

+    keyMaterialMap.put(IS_FOOTER_KEY_FIELD, Boolean.valueOf(isFooterKey));

+    if (isFooterKey) {

+      // 3. For footer key, write KMS Instance ID

+      keyMaterialMap.put(KMS_INSTANCE_ID_FIELD, kmsInstanceID);

+      // 4. For footer key, write KMS Instance URL

+      keyMaterialMap.put(KMS_INSTANCE_URL_FIELD, kmsInstanceURL);

+    }

+    // 5. Write master key ID

+    keyMaterialMap.put(MASTER_KEY_ID_FIELD, masterKeyID);

+    // 6. Write wrapped DEK

+    keyMaterialMap.put(WRAPPED_DEK_FIELD, encodedWrappedDEK);

+    // 7. Write isDoubleWrapped

+    keyMaterialMap.put(DOUBLE_WRAPPING_FIELD, Boolean.valueOf(isDoubleWrapped));

+    if (isDoubleWrapped) {

+      // 8. In double wrapping mode, write KEK ID

+      keyMaterialMap.put(KEK_ID_FIELD, kekID);

+      // 9. In double wrapping mode, write wrapped KEK

+      keyMaterialMap.put(WRAPPED_KEK_FIELD, encodedWrappedKEK);

+    }

+

+    try {

+      return OBJECT_MAPPER.writeValueAsString(keyMaterialMap);

+    } catch (IOException e) {

+      throw new ParquetCryptoRuntimeException("Failed to serialize key material", e);

+    }

+  }

+

+  boolean isFooterKey() {

+    return isFooterKey;

+  }

+

+  boolean isDoubleWrapped() {

+    return isDoubleWrapped;

+  }

+

+  String getMasterKeyID() {

+    return masterKeyID;

+  }

+

+  String getWrappedDEK() {

+    return encodedWrappedDEK;

+  }

+

+  String getKekID() {

+    return kekID;

+  }

+

+  String getWrappedKEK() {

+    return encodedWrappedKEK;

+  }

+

+  String getKmsInstanceID() {

+    return kmsInstanceID;

+  }

+

+  String getKmsInstanceURL() {

+    return kmsInstanceURL;

+  }

+}
\ No newline at end of file
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/KeyMetadata.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/KeyMetadata.java
new file mode 100644
index 0000000..f75fe7e
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/KeyMetadata.java
@@ -0,0 +1,130 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.parquet.crypto.keytools;

+

+import java.io.IOException;

+import java.io.StringReader;

+import java.nio.charset.StandardCharsets;

+import java.util.HashMap;

+import java.util.Map;

+

+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;

+import org.codehaus.jackson.map.ObjectMapper;

+import org.codehaus.jackson.type.TypeReference;

+

+/**

+ * Parquet encryption specification defines "key metadata" as an arbitrary byte array, generated by file writers for each encryption key, 

+ * and passed to the low level API for storage in the file footer . The "key metadata" field is made available to file readers to enable 

+ * recovery of the key. This simple interface can be utilized for implementation of any key management scheme.

+ * 

+ * The keytools package (PARQUET-1373) implements one approach, of many possible, to key management and to generation of the "key metadata" 

+ * fields. This approach, based on the "envelope encryption" pattern, allows to work with KMS servers. It keeps the actual material, 

+ * required to recover a key, in a "key material" object (see the KeyMaterial class for details).

+ * 

+ * KeyMetadata class writes (and reads) the "key metadata" field as a flat json object, with the following fields:

+ * 1. "keyMaterialType" - a String, with the type of  key material. In the current version, only one value is allowed - "PKMT1" (stands 

+ *     for "parquet key management tools, version 1")

+ * 2. "internalStorage" - a boolean. If true, means that "key material" is kept inside the "key metadata" field. If false, "key material"

+ *     is kept externally (outside Parquet files) - in this case, "key metadata" keeps a reference to the external "key material".

+ * 3. "keyReference" - a String, with the reference to the external "key material". Written only if internalStorage is false.

+ * 

+ * If internalStorage is true, "key material" is a part of "key metadata", and the json keeps additional fields, described in the

+ * KeyMaterial class.

+ */

+public class KeyMetadata {

+  static final String KEY_MATERIAL_INTERNAL_STORAGE_FIELD = "internalStorage";

+  private static final String KEY_REFERENCE_FIELD = "keyReference";

+

+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

+

+  private final boolean isInternalStorage;

+  private final String keyReference;

+  private final KeyMaterial keyMaterial;

+

+  private KeyMetadata(boolean isInternalStorage, String keyReference, KeyMaterial keyMaterial) {

+    this.isInternalStorage = isInternalStorage;

+    this.keyReference = keyReference;

+    this.keyMaterial = keyMaterial;

+  }

+

+  static KeyMetadata parse(byte[] keyMetadataBytes) {

+    String keyMetaDataString = new String(keyMetadataBytes, StandardCharsets.UTF_8);

+    Map<String, Object> keyMetadataJson = null;

+    try {

+      keyMetadataJson = OBJECT_MAPPER.readValue(new StringReader(keyMetaDataString),

+          new TypeReference<Map<String, Object>>() {});

+    } catch (IOException e) {

+      throw new ParquetCryptoRuntimeException("Failed to parse key metadata " + keyMetaDataString, e);

+    }

+

+    // 1. Extract "key material type", and make sure it is supported

+    String keyMaterialType = (String) keyMetadataJson.get(KeyMaterial.KEY_MATERIAL_TYPE_FIELD);

+    if (!KeyMaterial.KEY_MATERIAL_TYPE1.equals(keyMaterialType)) {

+      throw new ParquetCryptoRuntimeException("Wrong key material type: " + keyMaterialType + 

+          " vs " + KeyMaterial.KEY_MATERIAL_TYPE1);

+    }

+

+    // 2. Check if "key material" is stored internally in Parquet file key metadata, or is stored externally

+    Boolean isInternalStorage = (Boolean) keyMetadataJson.get(KEY_MATERIAL_INTERNAL_STORAGE_FIELD);

+    String keyReference;

+    KeyMaterial keyMaterial;

+

+    if (isInternalStorage) {

+      // 3.1 "key material" is stored internally, inside "key metadata" - parse it

+      keyMaterial = KeyMaterial.parse(keyMetadataJson);

+      keyReference = null;

+    } else {

+      // 3.2 "key material" is stored externally. "key metadata" keeps a reference to it

+      keyReference = (String) keyMetadataJson.get(KEY_REFERENCE_FIELD);

+      keyMaterial = null;

+    }

+

+    return new KeyMetadata(isInternalStorage, keyReference, keyMaterial);

+  }

+

+  // For external material only. For internal material, create serialized KeyMaterial directly

+  static String createSerializedForExternalMaterial(String keyReference) {

+    Map<String, Object> keyMetadataMap = new HashMap<String, Object>(3);

+    // 1. Write "key material type"

+    keyMetadataMap.put(KeyMaterial.KEY_MATERIAL_TYPE_FIELD, KeyMaterial.KEY_MATERIAL_TYPE1);

+    // 2. Write internal storage as false

+    keyMetadataMap.put(KEY_MATERIAL_INTERNAL_STORAGE_FIELD, Boolean.FALSE);

+    // 3. For externally stored "key material", "key metadata" keeps only a reference to it

+    keyMetadataMap.put(KEY_REFERENCE_FIELD, keyReference);

+

+    try {

+      return OBJECT_MAPPER.writeValueAsString(keyMetadataMap);

+    } catch (IOException e) {

+      throw new ParquetCryptoRuntimeException("Failed to serialize key metadata", e);

+    }

+  }

+

+  boolean keyMaterialStoredInternally() {

+    return isInternalStorage;

+  }

+

+  KeyMaterial getKeyMaterial() {

+    return keyMaterial;

+  }

+

+  String getKeyReference() {

+    return keyReference;

+  }

+}
\ No newline at end of file
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/KeyToolkit.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/KeyToolkit.java
new file mode 100644
index 0000000..6116bab
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/KeyToolkit.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.crypto.keytools;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.crypto.AesGcmDecryptor;
+import org.apache.parquet.crypto.AesGcmEncryptor;
+import org.apache.parquet.crypto.AesMode;
+import org.apache.parquet.crypto.KeyAccessDeniedException;
+import org.apache.parquet.crypto.ModuleCipherFactory;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.hadoop.BadConfigurationException;
+import org.apache.parquet.hadoop.util.ConfigurationUtil;
+import org.apache.parquet.hadoop.util.HiddenFileFilter;
+
+import java.io.IOException;
+
+import java.util.Base64;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+
+public class KeyToolkit {
+
+  /**
+   *  Class implementing the KmsClient interface.
+   *  KMS stands for “key management service”.
+   */
+  public static final String KMS_CLIENT_CLASS_PROPERTY_NAME = "parquet.encryption.kms.client.class";
+  /**
+   * ID of the KMS instance that will be used for encryption (if multiple KMS instances are available).
+   */
+  public static final String KMS_INSTANCE_ID_PROPERTY_NAME = "parquet.encryption.kms.instance.id";
+  /**
+   * URL of the KMS instance.
+   */
+  public static final String KMS_INSTANCE_URL_PROPERTY_NAME = "parquet.encryption.kms.instance.url";
+  /**
+   * Authorization token that will be passed to KMS.
+   */
+  public static final String KEY_ACCESS_TOKEN_PROPERTY_NAME = "parquet.encryption.key.access.token";
+  /**
+   * Use double wrapping - where data encryption keys (DEKs) are encrypted with key encryption keys (KEKs),
+   * which in turn are encrypted with master keys.
+   * By default, true. If set to false, DEKs are directly encrypted with master keys, KEKs are not used.
+   */
+  public static final String DOUBLE_WRAPPING_PROPERTY_NAME = "parquet.encryption.double.wrapping";
+  /**
+   * Lifetime of cached entities (key encryption keys, local wrapping keys, KMS client objects).
+   */
+  public static final String CACHE_LIFETIME_PROPERTY_NAME = "parquet.encryption.cache.lifetime.seconds";
+  /**
+   * Wrap keys locally - master keys are fetched from the KMS server and used to encrypt other keys (DEKs or KEKs).
+   * By default, false - key wrapping will be performed by a KMS server.
+   */
+  public static final String WRAP_LOCALLY_PROPERTY_NAME = "parquet.encryption.wrap.locally";
+  /**
+   * Store key material inside Parquet file footers; this mode doesn’t produce additional files.
+   * By default, true. If set to false, key material is stored in separate files in the same folder,
+   * which enables key rotation for immutable Parquet files.
+   */
+  public static final String KEY_MATERIAL_INTERNAL_PROPERTY_NAME = "parquet.encryption.key.material.store.internally";
+  /**
+   * Length of data encryption keys (DEKs), randomly generated by parquet key management tools.
+   * Can be 128, 192 or 256 bits.
+   */
+  public static final String DATA_KEY_LENGTH_PROPERTY_NAME = "parquet.encryption.data.key.length.bits";
+
+  public static final boolean DOUBLE_WRAPPING_DEFAULT = true;
+  public static final long CACHE_LIFETIME_DEFAULT_SECONDS = 10 * 60; // 10 minutes
+  public static final boolean WRAP_LOCALLY_DEFAULT = false;
+  public static final boolean KEY_MATERIAL_INTERNAL_DEFAULT = true;
+  public static final int DATA_KEY_LENGTH_DEFAULT = 128;
+
+  private static long lastCacheCleanForKeyRotationTime = 0;
+  private static Object lastCacheCleanForKeyRotationTimeLock = new Object();
+  // KMS servers typically allow to run key rotation once in a few hours / a day.
+  // We clean KEK writer cache (if needed) after 1 hour
+  private static final int CACHE_CLEAN_PERIOD_FOR_KEY_ROTATION = 60 * 60 * 1000; // 1 hour
+
+  // KMS client two level cache: token -> KMSInstanceId -> KmsClient
+  static final TwoLevelCacheWithExpiration<KmsClient> KMS_CLIENT_CACHE_PER_TOKEN =
+     KmsClientCache.INSTANCE.getCache();
+
+  // KEK two level cache for wrapping: token -> MEK_ID -> KeyEncryptionKey
+  static final TwoLevelCacheWithExpiration<KeyEncryptionKey> KEK_WRITE_CACHE_PER_TOKEN =
+      KEKWriteCache.INSTANCE.getCache();
+
+  // KEK two level cache for unwrapping: token -> KEK_ID -> KEK bytes
+  static final TwoLevelCacheWithExpiration<byte[]> KEK_READ_CACHE_PER_TOKEN =
+      KEKReadCache.INSTANCE.getCache();
+
+  private enum KmsClientCache {
+    INSTANCE;
+    private final TwoLevelCacheWithExpiration<KmsClient> cache =
+      new TwoLevelCacheWithExpiration<>();
+
+    private TwoLevelCacheWithExpiration<KmsClient> getCache() {
+      return cache;
+    }
+  }
+
+  private enum KEKWriteCache {
+    INSTANCE;
+    private final TwoLevelCacheWithExpiration<KeyEncryptionKey> cache =
+      new TwoLevelCacheWithExpiration<>();
+
+    private TwoLevelCacheWithExpiration<KeyEncryptionKey> getCache() {
+      return cache;
+    }
+  }
+
+  private enum KEKReadCache {
+    INSTANCE;
+    private final TwoLevelCacheWithExpiration<byte[]> cache =
+      new TwoLevelCacheWithExpiration<>();
+
+    private TwoLevelCacheWithExpiration<byte[]> getCache() {
+      return cache;
+    }
+  }
+
+  static class KeyWithMasterID {
+    private final byte[] keyBytes;
+    private final String masterID ;
+
+    KeyWithMasterID(byte[] keyBytes, String masterID) {
+      this.keyBytes = keyBytes;
+      this.masterID = masterID;
+    }
+
+    byte[] getDataKey() {
+      return keyBytes;
+    }
+
+    String getMasterID() {
+      return masterID;
+    }
+  }
+
+  static class KeyEncryptionKey {
+    private final byte[] kekBytes;
+    private final byte[] kekID;
+    private  String encodedKekID;
+    private final String encodedWrappedKEK;
+
+    KeyEncryptionKey(byte[] kekBytes, byte[] kekID, String encodedWrappedKEK) {
+      this.kekBytes = kekBytes;
+      this.kekID = kekID;
+      this.encodedWrappedKEK = encodedWrappedKEK;
+    }
+
+    byte[] getBytes() {
+      return kekBytes;
+    }
+
+    byte[] getID() {
+      return kekID;
+    }
+
+    String getEncodedID() {
+      if (null == encodedKekID) {
+        encodedKekID = Base64.getEncoder().encodeToString(kekID);
+      }
+      return encodedKekID;
+    }
+
+    String getEncodedWrappedKEK() {
+      return encodedWrappedKEK;
+    }
+  }
+
+  /**
+   * Key rotation. In the single wrapping mode, decrypts data keys with old master keys, then encrypts
+   * them with new master keys. In the double wrapping mode, decrypts KEKs (key encryption keys) with old
+   * master keys, generates new KEKs and encrypts them with new master keys.
+   * Works only if key material is not stored internally in file footers.
+   * Not supported in local key wrapping mode.
+   * Method can be run by multiple threads, but each thread must work on a different folder.
+   * @param folderPath parent path of Parquet files, whose keys will be rotated
+   * @param hadoopConfig Hadoop configuration
+   * @throws IOException I/O problems
+   * @throws ParquetCryptoRuntimeException General parquet encryption problems
+   * @throws KeyAccessDeniedException No access to master keys
+   * @throws UnsupportedOperationException Master key rotation not supported in the specific configuration
+   */
+  public static void rotateMasterKeys(String folderPath, Configuration hadoopConfig)
+    throws IOException, ParquetCryptoRuntimeException, KeyAccessDeniedException, UnsupportedOperationException {
+
+    if (hadoopConfig.getBoolean(KEY_MATERIAL_INTERNAL_PROPERTY_NAME, KEY_MATERIAL_INTERNAL_DEFAULT)) {
+      throw new UnsupportedOperationException("Key rotation is not supported for internal key material");
+    }
+
+    if (hadoopConfig.getBoolean(WRAP_LOCALLY_PROPERTY_NAME, WRAP_LOCALLY_DEFAULT)) {
+      throw new UnsupportedOperationException("Key rotation is not supported for local key wrapping");
+    }
+
+    // If process wrote files with double-wrapped keys, clean KEK cache (since master keys are changing).
+    // Only once for each key rotation cycle; not for every folder
+    long currentTime = System.currentTimeMillis();
+    synchronized (lastCacheCleanForKeyRotationTimeLock) {
+      if (currentTime - lastCacheCleanForKeyRotationTime > CACHE_CLEAN_PERIOD_FOR_KEY_ROTATION) {
+        KEK_WRITE_CACHE_PER_TOKEN.clear();
+        lastCacheCleanForKeyRotationTime = currentTime;
+      }
+    }
+
+    Path parentPath = new Path(folderPath);
+
+    FileSystem hadoopFileSystem = parentPath.getFileSystem(hadoopConfig);
+    if (!hadoopFileSystem.exists(parentPath) || !hadoopFileSystem.isDirectory(parentPath)) {
+      throw new ParquetCryptoRuntimeException("Couldn't rotate keys - folder doesn't exist or is not a directory: " + folderPath);
+    }
+
+    FileStatus[] parquetFilesInFolder = hadoopFileSystem.listStatus(parentPath, HiddenFileFilter.INSTANCE);
+    if (parquetFilesInFolder.length == 0) {
+      throw new ParquetCryptoRuntimeException("Couldn't rotate keys - no parquet files in folder " + folderPath);
+    }
+
+    for (FileStatus fs : parquetFilesInFolder) {
+      Path parquetFile = fs.getPath();
+
+      FileKeyMaterialStore keyMaterialStore = new HadoopFSKeyMaterialStore(hadoopFileSystem);
+      keyMaterialStore.initialize(parquetFile, hadoopConfig, false);
+      FileKeyUnwrapper fileKeyUnwrapper = new FileKeyUnwrapper(hadoopConfig, parquetFile, keyMaterialStore);
+
+      FileKeyMaterialStore tempKeyMaterialStore = new HadoopFSKeyMaterialStore(hadoopFileSystem);
+      tempKeyMaterialStore.initialize(parquetFile, hadoopConfig, true);
+      FileKeyWrapper fileKeyWrapper = new FileKeyWrapper(hadoopConfig, tempKeyMaterialStore);
+
+      Set<String> fileKeyIdSet = keyMaterialStore.getKeyIDSet();
+
+      // Start with footer key (to get KMS ID, URL, if needed)
+      String keyMaterialString = keyMaterialStore.getKeyMaterial(KeyMaterial.FOOTER_KEY_ID_IN_FILE);
+      KeyWithMasterID key = fileKeyUnwrapper.getDEKandMasterID(KeyMaterial.parse(keyMaterialString));
+      fileKeyWrapper.getEncryptionKeyMetadata(key.getDataKey(), key.getMasterID(), true,
+        KeyMaterial.FOOTER_KEY_ID_IN_FILE);
+
+      fileKeyIdSet.remove(KeyMaterial.FOOTER_KEY_ID_IN_FILE);
+      // Rotate column keys
+      for (String keyIdInFile : fileKeyIdSet) {
+        keyMaterialString = keyMaterialStore.getKeyMaterial(keyIdInFile);
+        key = fileKeyUnwrapper.getDEKandMasterID(KeyMaterial.parse(keyMaterialString));
+        fileKeyWrapper.getEncryptionKeyMetadata(key.getDataKey(), key.getMasterID(), false, keyIdInFile);
+      }
+
+      tempKeyMaterialStore.saveMaterial();
+
+      keyMaterialStore.removeMaterial();
+
+      tempKeyMaterialStore.moveMaterialTo(keyMaterialStore);
+    }
+  }
+
+  /**
+   * Flush any caches that are tied to the (compromised) accessToken
+   * @param accessToken access token
+   */
+  public static void removeCacheEntriesForToken(String accessToken) {
+    KMS_CLIENT_CACHE_PER_TOKEN.removeCacheEntriesForToken(accessToken);
+    KEK_WRITE_CACHE_PER_TOKEN.removeCacheEntriesForToken(accessToken);
+    KEK_READ_CACHE_PER_TOKEN.removeCacheEntriesForToken(accessToken);
+  }
+
+  public static void removeCacheEntriesForAllTokens() {
+    KMS_CLIENT_CACHE_PER_TOKEN.clear();
+    KEK_WRITE_CACHE_PER_TOKEN.clear();
+    KEK_READ_CACHE_PER_TOKEN.clear();
+  }
+
+  /**
+   * Encrypts "key" with "masterKey", using AES-GCM and the "AAD"
+   * @param keyBytes the key to encrypt
+   * @param masterKeyBytes encryption key
+   * @param AAD additional authenticated data
+   * @return base64 encoded encrypted key
+   */
+  public static String encryptKeyLocally(byte[] keyBytes, byte[] masterKeyBytes, byte[] AAD) {
+    AesGcmEncryptor keyEncryptor;
+
+    keyEncryptor = (AesGcmEncryptor) ModuleCipherFactory.getEncryptor(AesMode.GCM, masterKeyBytes);
+
+    byte[] encryptedKey = keyEncryptor.encrypt(false, keyBytes, AAD);
+
+    return Base64.getEncoder().encodeToString(encryptedKey);
+  }
+
+  /**
+   * Decrypts encrypted key with "masterKey", using AES-GCM and the "AAD"
+   * @param encodedEncryptedKey base64 encoded encrypted key
+   * @param masterKeyBytes encryption key
+   * @param AAD additional authenticated data
+   * @return decrypted key
+   */
+  public static byte[] decryptKeyLocally(String encodedEncryptedKey, byte[] masterKeyBytes, byte[] AAD) {
+    byte[] encryptedKey = Base64.getDecoder().decode(encodedEncryptedKey);
+
+    AesGcmDecryptor keyDecryptor;
+
+    keyDecryptor = (AesGcmDecryptor) ModuleCipherFactory.getDecryptor(AesMode.GCM, masterKeyBytes);
+
+    return keyDecryptor.decrypt(encryptedKey, 0, encryptedKey.length, AAD);
+  }
+
+  static KmsClient getKmsClient(String kmsInstanceID, String kmsInstanceURL, Configuration configuration,
+      String accessToken, long cacheEntryLifetime) {
+
+    ConcurrentMap<String, KmsClient> kmsClientPerKmsInstanceCache =
+        KMS_CLIENT_CACHE_PER_TOKEN.getOrCreateInternalCache(accessToken, cacheEntryLifetime);
+
+    KmsClient kmsClient =
+        kmsClientPerKmsInstanceCache.computeIfAbsent(kmsInstanceID,
+            (k) -> createAndInitKmsClient(configuration, kmsInstanceID, kmsInstanceURL, accessToken));
+
+    return kmsClient;
+  }
+
+  private static KmsClient createAndInitKmsClient(Configuration configuration, String kmsInstanceID,
+      String kmsInstanceURL, String accessToken) {
+
+    Class<?> kmsClientClass = null;
+    KmsClient kmsClient = null;
+
+    try {
+      kmsClientClass = ConfigurationUtil.getClassFromConfig(configuration,
+          KMS_CLIENT_CLASS_PROPERTY_NAME, KmsClient.class);
+
+      if (null == kmsClientClass) {
+        throw new ParquetCryptoRuntimeException("Unspecified " + KMS_CLIENT_CLASS_PROPERTY_NAME);
+      }
+      kmsClient = (KmsClient)kmsClientClass.newInstance();
+    } catch (InstantiationException | IllegalAccessException | BadConfigurationException e) {
+      throw new ParquetCryptoRuntimeException("Could not instantiate KmsClient class: "
+          + kmsClientClass, e);
+    }
+
+    kmsClient.initialize(configuration, kmsInstanceID, kmsInstanceURL, accessToken);
+
+    return kmsClient;
+  }
+
+  static String formatTokenForLog(String accessToken) {
+    int maxTokenDisplayLength = 5;
+    if (accessToken.length() <= maxTokenDisplayLength) {
+      return accessToken;
+    }
+    return accessToken.substring(accessToken.length() - maxTokenDisplayLength);
+  }
+
+  static boolean stringIsEmpty(String str) {
+    return (null == str) || str.isEmpty();
+  }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/KmsClient.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/KmsClient.java
new file mode 100644
index 0000000..d55dd87
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/KmsClient.java
@@ -0,0 +1,72 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.parquet.crypto.keytools;

+

+import org.apache.hadoop.conf.Configuration;

+import org.apache.parquet.crypto.KeyAccessDeniedException;

+

+public interface KmsClient {

+  

+  public static final String KMS_INSTANCE_ID_DEFAULT = "DEFAULT";

+  public static final String KMS_INSTANCE_URL_DEFAULT = "DEFAULT";

+  public static final String KEY_ACCESS_TOKEN_DEFAULT = "DEFAULT";

+  

+  /**

+   * Pass configuration with KMS-specific parameters.

+   * @param configuration Hadoop configuration

+   * @param kmsInstanceID ID of the KMS instance handled by this KmsClient. Use the default value, for KMS systems 

+   *                      that don't work with multiple instances.

+   * @param kmsInstanceURL URL of the KMS instance handled by this KmsClient. Use the default value, for KMS systems 

+   *                      that don't work with URLs.

+   * @param accessToken KMS access (authorization) token. Use the default value, for KMS systems that don't work with tokens.

+   * @throws KeyAccessDeniedException unauthorized to initialize the KMS client

+   */

+  public void initialize(Configuration configuration, String kmsInstanceID, String kmsInstanceURL, String accessToken) 

+      throws KeyAccessDeniedException;

+

+  /**

+   * Wraps a key - encrypts it with the master key, encodes the result 

+   * and potentially adds a KMS-specific metadata.

+   * 

+   * If your KMS client code throws runtime exceptions related to access/permission problems

+   * (such as Hadoop AccessControlException), catch them and throw the KeyAccessDeniedException.

+   * 

+   * @param keyBytes: key bytes to be wrapped

+   * @param masterKeyIdentifier: a string that uniquely identifies the master key in a KMS instance

+   * @return wrapped key

+   * @throws KeyAccessDeniedException unauthorized to encrypt with the given master key

+   */

+  public String wrapKey(byte[] keyBytes, String masterKeyIdentifier)

+      throws KeyAccessDeniedException;

+

+  /**

+   * Decrypts (unwraps) a key with the master key. 

+   * 

+   * If your KMS client code throws runtime exceptions related to access/permission problems

+   * (such as Hadoop AccessControlException), catch them and throw the KeyAccessDeniedException.

+   * 

+   * @param wrappedKey String produced by wrapKey operation

+   * @param masterKeyIdentifier: a string that uniquely identifies the master key in a KMS instance

+   * @return unwrapped key bytes

+   * @throws KeyAccessDeniedException unauthorized to unwrap with the given master key

+   */

+  public byte[] unwrapKey(String wrappedKey, String masterKeyIdentifier)

+      throws KeyAccessDeniedException;

+}
\ No newline at end of file
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/PropertiesDrivenCryptoFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/PropertiesDrivenCryptoFactory.java
new file mode 100644
index 0000000..8fdeca2
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/PropertiesDrivenCryptoFactory.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.crypto.keytools;
+
+import java.io.IOException;
+import java.security.SecureRandom;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.DecryptionKeyRetriever;
+import org.apache.parquet.crypto.DecryptionPropertiesFactory;
+import org.apache.parquet.crypto.EncryptionPropertiesFactory;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.ParquetCipher;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.hadoop.api.WriteSupport.WriteContext;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.parquet.crypto.keytools.KeyToolkit.stringIsEmpty;
+
+public class PropertiesDrivenCryptoFactory implements EncryptionPropertiesFactory, DecryptionPropertiesFactory {
+  private static final Logger LOG = LoggerFactory.getLogger(PropertiesDrivenCryptoFactory.class);
+
+  private static final int[] ACCEPTABLE_DATA_KEY_LENGTHS = {128, 192, 256};
+
+  /**
+   * List of columns to encrypt, with master key IDs (see HIVE-21848).
+   * Format: "masterKeyID:colName,colName;masterKeyID:colName..."
+   */
+  public static final String COLUMN_KEYS_PROPERTY_NAME = "parquet.encryption.column.keys";
+  /**
+   * Master key ID for footer encryption/signing.
+   */
+  public static final String FOOTER_KEY_PROPERTY_NAME = "parquet.encryption.footer.key";
+  /**
+   * Parquet encryption algorithm. Can be "AES_GCM_V1" (default), or "AES_GCM_CTR_V1".
+   */
+  public static final String ENCRYPTION_ALGORITHM_PROPERTY_NAME = "parquet.encryption.algorithm";
+  /**
+   * Write files with plaintext footer.
+   * By default, false - Parquet footers are encrypted.
+   */
+  public static final String PLAINTEXT_FOOTER_PROPERTY_NAME = "parquet.encryption.plaintext.footer";
+
+  public static final String ENCRYPTION_ALGORITHM_DEFAULT = ParquetCipher.AES_GCM_V1.toString();
+  public static final boolean PLAINTEXT_FOOTER_DEFAULT = false;
+
+  private static final SecureRandom RANDOM = new SecureRandom();
+
+  @Override
+  public FileEncryptionProperties getFileEncryptionProperties(Configuration fileHadoopConfig, Path tempFilePath,
+      WriteContext fileWriteContext) throws ParquetCryptoRuntimeException {
+
+    String footerKeyId = fileHadoopConfig.getTrimmed(FOOTER_KEY_PROPERTY_NAME);
+    String columnKeysStr = fileHadoopConfig.getTrimmed(COLUMN_KEYS_PROPERTY_NAME);
+
+    // File shouldn't be encrypted
+    if (stringIsEmpty(footerKeyId) && stringIsEmpty(columnKeysStr)) {
+      LOG.debug("Unencrypted file: {}", tempFilePath);
+      return null;
+    }
+
+    if (stringIsEmpty(footerKeyId)) {
+      throw new ParquetCryptoRuntimeException("Undefined footer key");
+    }
+
+    FileKeyMaterialStore keyMaterialStore = null;
+    boolean keyMaterialInternalStorage = fileHadoopConfig.getBoolean(KeyToolkit.KEY_MATERIAL_INTERNAL_PROPERTY_NAME,
+        KeyToolkit.KEY_MATERIAL_INTERNAL_DEFAULT);
+    if (!keyMaterialInternalStorage) {
+      try {
+        keyMaterialStore = new HadoopFSKeyMaterialStore(tempFilePath.getFileSystem(fileHadoopConfig));
+        keyMaterialStore.initialize(tempFilePath, fileHadoopConfig, false);
+      } catch (IOException e) {
+        throw new ParquetCryptoRuntimeException("Failed to get key material store", e);
+      }
+    }
+
+    FileKeyWrapper keyWrapper = new FileKeyWrapper(fileHadoopConfig, keyMaterialStore);
+
+    String algo = fileHadoopConfig.getTrimmed(ENCRYPTION_ALGORITHM_PROPERTY_NAME, ENCRYPTION_ALGORITHM_DEFAULT);
+    ParquetCipher cipher;
+    try {
+      cipher = ParquetCipher.valueOf(algo);
+    } catch (IllegalArgumentException e) {
+      throw new ParquetCryptoRuntimeException("Wrong encryption algorithm: " + algo);
+    }
+
+    int dekLengthBits = fileHadoopConfig.getInt(KeyToolkit.DATA_KEY_LENGTH_PROPERTY_NAME,
+        KeyToolkit.DATA_KEY_LENGTH_DEFAULT);
+
+    if (Arrays.binarySearch(ACCEPTABLE_DATA_KEY_LENGTHS, dekLengthBits) < 0) {
+      throw new ParquetCryptoRuntimeException("Wrong data key length : " + dekLengthBits);
+    }
+
+    int dekLength = dekLengthBits / 8;
+
+    byte[] footerKeyBytes = new byte[dekLength];
+    RANDOM.nextBytes(footerKeyBytes);
+    byte[] footerKeyMetadata = keyWrapper.getEncryptionKeyMetadata(footerKeyBytes, footerKeyId, true);
+
+    Map<ColumnPath, ColumnEncryptionProperties> encryptedColumns = getColumnEncryptionProperties(dekLength, columnKeysStr, keyWrapper);
+
+    boolean plaintextFooter = fileHadoopConfig.getBoolean(PLAINTEXT_FOOTER_PROPERTY_NAME, PLAINTEXT_FOOTER_DEFAULT);
+
+    FileEncryptionProperties.Builder propertiesBuilder = FileEncryptionProperties.builder(footerKeyBytes)
+        .withFooterKeyMetadata(footerKeyMetadata)
+        .withAlgorithm(cipher)
+        .withEncryptedColumns(encryptedColumns);
+
+    if (plaintextFooter) {
+      propertiesBuilder = propertiesBuilder.withPlaintextFooter();
+    }
+
+    if (null != keyMaterialStore) {
+      keyMaterialStore.saveMaterial();
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("File encryption properties for {} - algo: {}; footer key id: {}; plaintext footer: {}; "
+          + "internal key material: {}; encrypted columns: {}",
+          tempFilePath, cipher, footerKeyId, plaintextFooter, keyMaterialInternalStorage, columnKeysStr);
+    }
+
+    return propertiesBuilder.build();
+  }
+
+  private Map<ColumnPath, ColumnEncryptionProperties> getColumnEncryptionProperties(int dekLength, String columnKeys,
+      FileKeyWrapper keyWrapper) throws ParquetCryptoRuntimeException {
+    if (stringIsEmpty(columnKeys)) {
+      throw new ParquetCryptoRuntimeException("No column keys configured in " + COLUMN_KEYS_PROPERTY_NAME);
+    }
+    Map<ColumnPath, ColumnEncryptionProperties> encryptedColumns = new HashMap<ColumnPath, ColumnEncryptionProperties>();
+    String keyToColumns[] = columnKeys.split(";");
+    for (int i = 0; i < keyToColumns.length; ++i) {
+      final String curKeyToColumns = keyToColumns[i].trim();
+      if (curKeyToColumns.isEmpty()) {
+        continue;
+      }
+
+      String[] parts = curKeyToColumns.split(":");
+      if (parts.length != 2) {
+        throw new ParquetCryptoRuntimeException("Incorrect key to columns mapping in " + COLUMN_KEYS_PROPERTY_NAME
+            + ": [" + curKeyToColumns + "]");
+      }
+
+      String columnKeyId = parts[0].trim();
+      if (columnKeyId.isEmpty()) {
+        throw new ParquetCryptoRuntimeException("Empty key name in " + COLUMN_KEYS_PROPERTY_NAME);
+      }
+
+      String columnNamesStr = parts[1].trim();
+      String[] columnNames = columnNamesStr.split(",");
+      if (0 == columnNames.length) {
+        throw new ParquetCryptoRuntimeException("No columns to encrypt defined for key: " + columnKeyId);
+      }
+
+      for (int j = 0; j < columnNames.length; ++j) {
+        final String columnName = columnNames[j].trim();
+        if (columnName.isEmpty()) {
+          throw new ParquetCryptoRuntimeException("Empty column name in " + COLUMN_KEYS_PROPERTY_NAME + " for key: " + columnKeyId);
+        }
+
+        final ColumnPath columnPath = ColumnPath.fromDotString(columnName);
+        if (encryptedColumns.containsKey(columnPath)) {
+          throw new ParquetCryptoRuntimeException("Multiple keys defined for the same column: " + columnName);
+        }
+
+        byte[] columnKeyBytes = new byte[dekLength];
+        RANDOM.nextBytes(columnKeyBytes);
+        byte[] columnKeyKeyMetadata =  keyWrapper.getEncryptionKeyMetadata(columnKeyBytes, columnKeyId, false);
+
+        ColumnEncryptionProperties cmd = ColumnEncryptionProperties.builder(columnPath)
+            .withKey(columnKeyBytes)
+            .withKeyMetaData(columnKeyKeyMetadata)
+            .build();
+        encryptedColumns.put(columnPath, cmd);
+      }
+    }
+    if (encryptedColumns.isEmpty()) {
+      throw new ParquetCryptoRuntimeException("No column keys configured in " + COLUMN_KEYS_PROPERTY_NAME);
+    }
+
+    return encryptedColumns;
+  }
+
+  @Override
+  public FileDecryptionProperties getFileDecryptionProperties(Configuration hadoopConfig, Path filePath)
+      throws ParquetCryptoRuntimeException {
+
+    DecryptionKeyRetriever keyRetriever = new FileKeyUnwrapper(hadoopConfig, filePath);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("File decryption properties for {}", filePath);
+    }
+
+    return FileDecryptionProperties.builder()
+        .withKeyRetriever(keyRetriever)
+        .withPlaintextFilesAllowed()
+        .build();
+  }
+}
+
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/RemoteKmsClient.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/RemoteKmsClient.java
new file mode 100644
index 0000000..c8a7435
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/RemoteKmsClient.java
@@ -0,0 +1,229 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+

+package org.apache.parquet.crypto.keytools;

+

+import org.apache.hadoop.conf.Configuration;

+import org.apache.parquet.crypto.KeyAccessDeniedException;

+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;

+import org.codehaus.jackson.map.ObjectMapper;

+import org.codehaus.jackson.type.TypeReference;

+

+import java.io.IOException;

+import java.io.StringReader;

+import java.nio.charset.StandardCharsets;

+import java.util.HashMap;

+import java.util.Map;

+import java.util.concurrent.ConcurrentHashMap;

+import java.util.concurrent.ConcurrentMap;

+

+import static org.apache.parquet.crypto.keytools.KeyToolkit.stringIsEmpty;

+

+public abstract class RemoteKmsClient implements KmsClient {

+

+  public static final String LOCAL_WRAP_NO_KEY_VERSION = "NO_VERSION";

+

+  protected String kmsInstanceID;

+  protected String kmsInstanceURL;

+  protected String kmsToken;

+  protected Boolean isWrapLocally;

+  protected Configuration hadoopConfiguration;

+  protected boolean isDefaultToken;

+

+  // MasterKey cache: master keys per key ID (per KMS Client). For local wrapping only.

+  private ConcurrentMap<String, byte[]> masterKeyCache;

+

+  /**

+   * KMS systems wrap keys by encrypting them by master keys, and attaching additional information (such as the version 

+   * number of the masker key) to the result of encryption. The master key version is required in  key rotation.

+   * Currently, the local wrapping mode does not support key rotation (because not all KMS systems allow to fetch a master

+   * key by its ID and version number). Still, the local wrapping mode adds a placeholder for the master key version, that will

+   * enable support for key rotation in this mode in the future, with appropriate KMS systems. This will also enable backward

+   * compatibility, where future readers will be able to extract master key version in the files written by the current code.

+   * 

+   * LocalKeyWrap class writes (and reads) the "key wrap" as a flat json with the following fields:

+   * 1. "masterKeyVersion" - a String, with the master key version. In the current version, only one value is allowed - "NO_VERSION".

+   * 2. "encryptedKey" - a String, with the key encrypted by the master key (base64-encoded).

+   */

+  static class LocalKeyWrap {

+    public static final String LOCAL_WRAP_KEY_VERSION_FIELD = "masterKeyVersion";

+    public static final String LOCAL_WRAP_ENCRYPTED_KEY_FIELD = "encryptedKey";

+

+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

+

+    private String encryptedEncodedKey;

+    private String masterKeyVersion;

+

+    private LocalKeyWrap(String masterKeyVersion, String encryptedEncodedKey) {

+      this.masterKeyVersion = masterKeyVersion;

+      this.encryptedEncodedKey = encryptedEncodedKey;

+    }

+

+    private static String createSerialized(String encryptedEncodedKey) {

+      Map<String, String> keyWrapMap = new HashMap<String, String>(2);

+      keyWrapMap.put(LOCAL_WRAP_KEY_VERSION_FIELD, LOCAL_WRAP_NO_KEY_VERSION);

+      keyWrapMap.put(LOCAL_WRAP_ENCRYPTED_KEY_FIELD, encryptedEncodedKey);

+      try {

+        return OBJECT_MAPPER.writeValueAsString(keyWrapMap);

+      } catch (IOException e) {

+        throw new ParquetCryptoRuntimeException("Failed to serialize local key wrap map", e);

+      }

+    }

+

+    private static LocalKeyWrap parse(String wrappedKey) {

+      Map<String, String> keyWrapMap = null;

+      try {

+        keyWrapMap = OBJECT_MAPPER.readValue(new StringReader(wrappedKey),

+            new TypeReference<Map<String, String>>() {});

+      } catch (IOException e) {

+        throw new ParquetCryptoRuntimeException("Failed to parse local key wrap json " + wrappedKey, e);

+      }

+      String encryptedEncodedKey = keyWrapMap.get(LOCAL_WRAP_ENCRYPTED_KEY_FIELD);

+      String masterKeyVersion = keyWrapMap.get(LOCAL_WRAP_KEY_VERSION_FIELD);

+

+      return new LocalKeyWrap(masterKeyVersion, encryptedEncodedKey);

+    }

+

+    private String getMasterKeyVersion() {

+      return masterKeyVersion;

+    }

+

+    private String getEncryptedKey() {

+      return encryptedEncodedKey;

+    }

+  }

+

+  @Override

+  public void initialize(Configuration configuration, String kmsInstanceID, String kmsInstanceURL, String accessToken) {

+    this.kmsInstanceID = kmsInstanceID;

+    this.kmsInstanceURL = kmsInstanceURL;

+

+    this.isWrapLocally = configuration.getBoolean(KeyToolkit.WRAP_LOCALLY_PROPERTY_NAME, KeyToolkit.WRAP_LOCALLY_DEFAULT);

+    if (isWrapLocally) {

+      masterKeyCache = new ConcurrentHashMap<>();

+    }

+

+    hadoopConfiguration = configuration;

+    kmsToken = accessToken;

+

+    isDefaultToken = kmsToken.equals(KmsClient.KEY_ACCESS_TOKEN_DEFAULT);

+

+    initializeInternal();

+  }

+

+  @Override

+  public String wrapKey(byte[] key, String masterKeyIdentifier) throws KeyAccessDeniedException {

+    if (isWrapLocally) {

+      byte[] masterKey =  masterKeyCache.computeIfAbsent(masterKeyIdentifier,

+          (k) -> getKeyFromServer(masterKeyIdentifier));

+      byte[] AAD = masterKeyIdentifier.getBytes(StandardCharsets.UTF_8);

+      String encryptedEncodedKey =  KeyToolkit.encryptKeyLocally(key, masterKey, AAD);

+      return LocalKeyWrap.createSerialized(encryptedEncodedKey);

+    } else {

+      refreshToken();

+      return wrapKeyInServer(key, masterKeyIdentifier);

+    }

+  }

+

+  @Override

+  public byte[] unwrapKey(String wrappedKey, String masterKeyIdentifier) throws KeyAccessDeniedException {

+    if (isWrapLocally) {

+      LocalKeyWrap keyWrap = LocalKeyWrap.parse(wrappedKey);

+      String masterKeyVersion = keyWrap.getMasterKeyVersion();

+      if (!LOCAL_WRAP_NO_KEY_VERSION.equals(masterKeyVersion)) {

+        throw new ParquetCryptoRuntimeException("Master key versions are not supported for local wrapping: "

+          + masterKeyVersion);

+      }

+      String encryptedEncodedKey = keyWrap.getEncryptedKey();

+      byte[] masterKey = masterKeyCache.computeIfAbsent(masterKeyIdentifier,

+          (k) -> getKeyFromServer(masterKeyIdentifier));

+      byte[] AAD = masterKeyIdentifier.getBytes(StandardCharsets.UTF_8);

+      return KeyToolkit.decryptKeyLocally(encryptedEncodedKey, masterKey, AAD);

+    } else {

+      refreshToken();

+      return unwrapKeyInServer(wrappedKey, masterKeyIdentifier);

+    }

+  }

+

+  private void refreshToken() {

+    if (isDefaultToken) {

+      return;

+    }

+    kmsToken = hadoopConfiguration.getTrimmed(KeyToolkit.KEY_ACCESS_TOKEN_PROPERTY_NAME);

+    if (stringIsEmpty(kmsToken)) {

+      throw new ParquetCryptoRuntimeException("Empty token");

+    }

+  }

+

+  private byte[] getKeyFromServer(String keyIdentifier) {

+    refreshToken();

+    return getMasterKeyFromServer(keyIdentifier);

+  }

+

+  /**

+   * Wrap a key with the master key in the remote KMS server.

+   * 

+   * If your KMS client code throws runtime exceptions related to access/permission problems

+   * (such as Hadoop AccessControlException), catch them and throw the KeyAccessDeniedException.

+   * 

+   * @param keyBytes: key bytes to be wrapped

+   * @param masterKeyIdentifier: a string that uniquely identifies the master key in a KMS instance

+   * @return wrappedKey: Encrypts key bytes with the master key, encodes the result  and potentially adds a KMS-specific metadata.

+   * @throws KeyAccessDeniedException unauthorized to encrypt with the given master key

+   * @throws UnsupportedOperationException KMS does not support in-server wrapping 

+   */

+  protected abstract String wrapKeyInServer(byte[] keyBytes, String masterKeyIdentifier) 

+      throws KeyAccessDeniedException, UnsupportedOperationException;

+

+  /**

+   * Unwrap a key with the master key in the remote KMS server. 

+   * 

+   * If your KMS client code throws runtime exceptions related to access/permission problems

+   * (such as Hadoop AccessControlException), catch them and throw the KeyAccessDeniedException.

+   * 

+   * @param wrappedKey String produced by wrapKey operation

+   * @param masterKeyIdentifier: a string that uniquely identifies the master key in a KMS instance

+   * @return key bytes

+   * @throws KeyAccessDeniedException unauthorized to unwrap with the given master key

+   * @throws UnsupportedOperationException KMS does not support in-server unwrapping 

+   */

+  protected abstract byte[] unwrapKeyInServer(String wrappedKey, String masterKeyIdentifier) 

+      throws KeyAccessDeniedException, UnsupportedOperationException;

+

+  /**

+   * Get master key from the remote KMS server.

+   * Required only for local wrapping. No need to implement if KMS supports in-server wrapping/unwrapping.

+   * 

+   * If your KMS client code throws runtime exceptions related to access/permission problems

+   * (such as Hadoop AccessControlException), catch them and throw the KeyAccessDeniedException.

+   * 

+   * @param masterKeyIdentifier: a string that uniquely identifies the master key in a KMS instance

+   * @return master key bytes

+   * @throws KeyAccessDeniedException unauthorized to get the master key

+   * @throws UnsupportedOperationException If not implemented, or KMS does not support key fetching 

+   */

+  protected abstract byte[] getMasterKeyFromServer(String masterKeyIdentifier) 

+      throws KeyAccessDeniedException, UnsupportedOperationException;

+

+  /**

+   * Pass configuration with KMS-specific parameters.

+   */

+  protected abstract void initializeInternal() 

+      throws KeyAccessDeniedException;

+}
\ No newline at end of file
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/TwoLevelCacheWithExpiration.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/TwoLevelCacheWithExpiration.java
new file mode 100644
index 0000000..b5edbfb
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/TwoLevelCacheWithExpiration.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.crypto.keytools;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Concurrent two-level cache with expiration of internal caches according to token lifetime.
+ * External cache is per token, internal is per String key.
+ * Wrapper class around:
+ *   ConcurrentMap<String, ExpiringCacheEntry<ConcurrentMap<String, V>>>
+ *
+ * @param <V> Value
+ */
+class TwoLevelCacheWithExpiration<V> {
+
+  private final ConcurrentMap<String, ExpiringCacheEntry<ConcurrentMap<String, V>>> cache;
+  private volatile long lastCacheCleanupTimestamp;
+
+  static class ExpiringCacheEntry<E>  {
+    private final long expirationTimestamp;
+    private final E cachedItem;
+
+    private ExpiringCacheEntry(E cachedItem, long expirationIntervalMillis) {
+      this.expirationTimestamp = System.currentTimeMillis() + expirationIntervalMillis;
+      this.cachedItem = cachedItem;
+    }
+
+    private boolean isExpired() {
+      final long now = System.currentTimeMillis();
+      return (now > expirationTimestamp);
+    }
+
+    private E getCachedItem() {
+      return cachedItem;
+    }
+  }
+
+  TwoLevelCacheWithExpiration() {
+    this.cache = new ConcurrentHashMap<>();
+    this.lastCacheCleanupTimestamp = System.currentTimeMillis();
+  }
+
+  ConcurrentMap<String,V> getOrCreateInternalCache(String accessToken, long cacheEntryLifetime) {
+    ExpiringCacheEntry<ConcurrentMap<String, V>> externalCacheEntry = cache.compute(accessToken, (token, cacheEntry) -> {
+      if ((null == cacheEntry) || cacheEntry.isExpired()) {
+        return new ExpiringCacheEntry<>(new ConcurrentHashMap<String, V>(), cacheEntryLifetime);
+      } else {
+        return cacheEntry;
+      }
+    });
+    return externalCacheEntry.getCachedItem();
+  }
+
+  void removeCacheEntriesForToken(String accessToken) {
+      cache.remove(accessToken);
+  }
+
+  void removeCacheEntriesForAllTokens() {
+      cache.clear();
+  }
+
+  public void checkCacheForExpiredTokens(long cacheCleanupPeriod) {
+    long now = System.currentTimeMillis();
+
+    if (now > (lastCacheCleanupTimestamp + cacheCleanupPeriod)) {
+      synchronized (cache) {
+        if (now > (lastCacheCleanupTimestamp + cacheCleanupPeriod)) {
+          removeExpiredEntriesFromCache();
+          lastCacheCleanupTimestamp = now + cacheCleanupPeriod;
+        }
+      }
+    }
+  }
+
+  public void removeExpiredEntriesFromCache() {
+    cache.values().removeIf(cacheEntry -> cacheEntry.isExpired());
+  }
+
+  public void remove(String accessToken) {
+    cache.remove(accessToken);
+  }
+
+  public void clear() {
+    cache.clear();
+  }
+}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/SingleRow.java b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/SingleRow.java
new file mode 100644
index 0000000..3e73d31
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/SingleRow.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.crypto;
+
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
+import org.apache.parquet.statistics.RandomValues;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+
+
+public class SingleRow {
+  private static final int RANDOM_SEED = 42;
+  private static final int FIXED_LENGTH = 10;
+  private static final Random RANDOM = new Random(RANDOM_SEED);
+  private static final RandomValues.IntGenerator intGenerator
+    = new RandomValues.IntGenerator(RANDOM_SEED);
+  private static final RandomValues.FloatGenerator floatGenerator
+    = new RandomValues.FloatGenerator(RANDOM_SEED);
+  private static final RandomValues.DoubleGenerator doubleGenerator
+    = new RandomValues.DoubleGenerator(RANDOM_SEED);
+  private static final RandomValues.BinaryGenerator binaryGenerator
+    = new RandomValues.BinaryGenerator(RANDOM_SEED);
+  private static final RandomValues.FixedGenerator fixedBinaryGenerator
+    = new RandomValues.FixedGenerator(RANDOM_SEED, FIXED_LENGTH);
+
+  public static final String BOOLEAN_FIELD_NAME = "boolean_field";
+  public static final String INT32_FIELD_NAME = "int32_field";
+  public static final String FLOAT_FIELD_NAME = "float_field";
+  public static final String DOUBLE_FIELD_NAME = "double_field";
+  public static final String BINARY_FIELD_NAME = "ba_field";
+  public static final String FIXED_LENGTH_BINARY_FIELD_NAME = "flba_field";
+  public static final String PLAINTEXT_INT32_FIELD_NAME = "plain_int32_field";
+
+  private static final MessageType SCHEMA =
+    new MessageType("schema",
+      new PrimitiveType(REQUIRED, BOOLEAN, BOOLEAN_FIELD_NAME),
+      new PrimitiveType(REQUIRED, INT32, INT32_FIELD_NAME),
+      new PrimitiveType(REQUIRED, FLOAT, FLOAT_FIELD_NAME),
+      new PrimitiveType(REQUIRED, DOUBLE, DOUBLE_FIELD_NAME),
+      new PrimitiveType(OPTIONAL, BINARY, BINARY_FIELD_NAME),
+      Types.required(FIXED_LEN_BYTE_ARRAY).length(FIXED_LENGTH).named(FIXED_LENGTH_BINARY_FIELD_NAME),
+      new PrimitiveType(OPTIONAL, INT32, PLAINTEXT_INT32_FIELD_NAME));
+
+  public final boolean boolean_field;
+  public final int int32_field;
+  public final float float_field;
+  public final double double_field;
+  public final byte[] ba_field;
+  public final byte[] flba_field;
+  public final Integer plaintext_int32_field; // Can be null, since it doesn't exist in C++-created files yet.
+
+  public SingleRow(boolean boolean_field,
+                   int int32_field,
+                   float float_field,
+                   double double_field,
+                   byte[] ba_field,
+                   byte[] flba_field,
+                   Integer plaintext_int32_field) {
+    this.boolean_field = boolean_field;
+    this.int32_field = int32_field;
+    this.float_field = float_field;
+    this.double_field = double_field;
+    this.ba_field = ba_field;
+    this.flba_field = flba_field;
+    this.plaintext_int32_field = plaintext_int32_field;
+  }
+
+  public static MessageType getSchema() { return SCHEMA; }
+
+  public static List<SingleRow> generateRandomData(int rowCount) {
+    List<SingleRow> dataList = new ArrayList<>(rowCount);
+    for (int row = 0; row < rowCount; ++row) {
+      SingleRow newRow = new SingleRow(RANDOM.nextBoolean(),
+        intGenerator.nextValue(),  floatGenerator.nextValue(),
+        doubleGenerator.nextValue(), binaryGenerator.nextValue().getBytes(),
+        fixedBinaryGenerator.nextValue().getBytes(), intGenerator.nextValue());
+      dataList.add(newRow);
+    }
+    return dataList;
+  }
+
+  public static List<SingleRow> generateLinearData(int rowCount) {
+    List<SingleRow> dataList = new ArrayList<>(rowCount);
+    String baseStr = "parquet";
+    for (int row = 0; row < rowCount; ++row) {
+      boolean boolean_val = ((row % 2) == 0) ? true : false;
+      float float_val = (float) row * 1.1f;
+      double double_val = (row * 1.1111111);
+
+      byte[] binary_val = null;
+      if ((row % 2) == 0) {
+        char firstChar = (char) ((int) '0' + row / 100);
+        char secondChar = (char) ((int) '0' + (row / 10) % 10);
+        char thirdChar = (char) ((int) '0' + row % 10);
+        binary_val = (baseStr + firstChar + secondChar + thirdChar).getBytes(StandardCharsets.UTF_8);
+      }
+      char[] fixed = new char[FIXED_LENGTH];
+      char[] aChar = Character.toChars(row);
+      Arrays.fill(fixed, aChar[0]);
+
+      SingleRow newRow = new SingleRow(boolean_val,
+        row, float_val, double_val,
+        binary_val, new String(fixed).getBytes(StandardCharsets.UTF_8), null/*plaintext_int32_field*/);
+      dataList.add(newRow);
+    }
+    return dataList;
+  }
+}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/TestPropertiesDrivenEncryption.java b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/TestPropertiesDrivenEncryption.java
new file mode 100644
index 0000000..a9c078b
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/TestPropertiesDrivenEncryption.java
@@ -0,0 +1,647 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.crypto;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.crypto.keytools.KeyToolkit;
+import org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory;
+import org.apache.parquet.crypto.keytools.mocks.InMemoryKMS;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Types;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ErrorCollector;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Base64;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.parquet.hadoop.ParquetFileWriter.EFMAGIC;
+import static org.apache.parquet.hadoop.ParquetFileWriter.EF_MAGIC_STR;
+import static org.apache.parquet.hadoop.ParquetFileWriter.MAGIC;
+import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+
+/*
+ * This file contains samples for writing and reading encrypted Parquet files in different
+ * encryption and decryption configurations, set using a properties-driven interface.
+ *
+ * The write sample produces number of parquet files, each encrypted with a different
+ * encryption configuration as described below.
+ * The name of each file is in the form of:
+ * <encryption-configuration-name>.parquet.encrypted or
+ * NO_ENCRYPTION.parquet for plaintext file.
+ *
+ * The read sample creates a set of decryption configurations and then uses each of them
+ * to read all encrypted files in the input directory.
+ *
+ * The different encryption and decryption configurations are listed below.
+ *
+ *
+ * A detailed description of the Parquet Modular Encryption specification can be found
+ * here:
+ * https://github.com/apache/parquet-format/blob/encryption/Encryption.md
+ *
+ * The write sample creates files with eight columns in the following
+ * encryption configurations:
+ *
+ *  - ENCRYPT_COLUMNS_AND_FOOTER:   Encrypt two columns and the footer, with different
+ *                                  keys.
+ *  - ENCRYPT_COLUMNS_PLAINTEXT_FOOTER:   Encrypt two columns, with different keys.
+ *                                  Do not encrypt footer (to enable legacy readers)
+ *                                  - plaintext footer mode.
+ *  - ENCRYPT_COLUMNS_AND_FOOTER_CTR:   Encrypt two columns and the footer, with different
+ *                                  keys. Use the alternative (AES_GCM_CTR_V1) algorithm.
+ *  - NO_ENCRYPTION:   Do not encrypt anything
+ *
+ *
+ *
+ * The read sample uses each of the following decryption configurations to read every
+ * encrypted files in the input directory:
+ *
+ *  - DECRYPT_WITH_KEY_RETRIEVER:   Decrypt using key retriever that holds the keys of
+ *                                  two encrypted columns and the footer key.
+ *  - NO_DECRYPTION:   Do not decrypt anything.
+ */
+@RunWith(Parameterized.class)
+public class TestPropertiesDrivenEncryption {
+  @Parameterized.Parameters(name = "Run {index}: isKeyMaterialInternalStorage={0} isDoubleWrapping={1} isWrapLocally={2}")
+  public static Collection<Object[]> data() {
+    Collection<Object[]> list = new ArrayList<>(8);
+    boolean[] flagValues = { false, true };
+    for (boolean keyMaterialInternalStorage : flagValues) {
+      for (boolean doubleWrapping : flagValues) {
+        for (boolean wrapLocally : flagValues) {
+          Object[] vector = {keyMaterialInternalStorage, doubleWrapping, wrapLocally};
+          list.add(vector);
+        }
+      }
+    }
+    return list;
+  }
+
+  @Parameterized.Parameter // first data value (0) is default
+  public boolean isKeyMaterialInternalStorage;
+
+  @Parameterized.Parameter(value = 1)
+  public boolean isDoubleWrapping;
+
+  @Parameterized.Parameter(value = 2)
+  public boolean isWrapLocally;
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestPropertiesDrivenEncryption.class);
+
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Rule
+  public ErrorCollector errorCollector = new ErrorCollector();
+
+  private static final Base64.Encoder encoder = Base64.getEncoder();
+  private static final String FOOTER_MASTER_KEY =
+    encoder.encodeToString("0123456789012345".getBytes(StandardCharsets.UTF_8));
+  private static final String[] COLUMN_MASTER_KEYS = {
+    encoder.encodeToString("1234567890123450".getBytes(StandardCharsets.UTF_8)),
+    encoder.encodeToString("1234567890123451".getBytes(StandardCharsets.UTF_8)),
+    encoder.encodeToString("1234567890123452".getBytes(StandardCharsets.UTF_8)),
+    encoder.encodeToString("1234567890123453".getBytes(StandardCharsets.UTF_8)),
+    encoder.encodeToString("1234567890123454".getBytes(StandardCharsets.UTF_8)),
+    encoder.encodeToString("1234567890123455".getBytes(StandardCharsets.UTF_8))};
+  private static final String[] COLUMN_MASTER_KEY_IDS = { "kc1", "kc2", "kc3", "kc4", "kc5", "kc6"};
+  private static final String FOOTER_MASTER_KEY_ID = "kf";
+
+  private static final String KEY_LIST =  new StringBuilder()
+    .append(COLUMN_MASTER_KEY_IDS[0]).append(": ").append(COLUMN_MASTER_KEYS[0]).append(", ")
+    .append(COLUMN_MASTER_KEY_IDS[1]).append(": ").append(COLUMN_MASTER_KEYS[1]).append(", ")
+    .append(COLUMN_MASTER_KEY_IDS[2]).append(": ").append(COLUMN_MASTER_KEYS[2]).append(", ")
+    .append(COLUMN_MASTER_KEY_IDS[3]).append(": ").append(COLUMN_MASTER_KEYS[3]).append(", ")
+    .append(COLUMN_MASTER_KEY_IDS[4]).append(": ").append(COLUMN_MASTER_KEYS[4]).append(", ")
+    .append(COLUMN_MASTER_KEY_IDS[5]).append(": ").append(COLUMN_MASTER_KEYS[5]).append(", ")
+    .append(FOOTER_MASTER_KEY_ID).append(": ").append(FOOTER_MASTER_KEY).toString();
+
+  private static final String NEW_FOOTER_MASTER_KEY =
+    encoder.encodeToString("9123456789012345".getBytes(StandardCharsets.UTF_8));
+  private static final String[] NEW_COLUMN_MASTER_KEYS = {
+    encoder.encodeToString("9234567890123450".getBytes(StandardCharsets.UTF_8)),
+    encoder.encodeToString("9234567890123451".getBytes(StandardCharsets.UTF_8)),
+    encoder.encodeToString("9234567890123452".getBytes(StandardCharsets.UTF_8)),
+    encoder.encodeToString("9234567890123453".getBytes(StandardCharsets.UTF_8)),
+    encoder.encodeToString("9234567890123454".getBytes(StandardCharsets.UTF_8)),
+    encoder.encodeToString("9234567890123455".getBytes(StandardCharsets.UTF_8))};
+
+  private static final String NEW_KEY_LIST =  new StringBuilder()
+    .append(COLUMN_MASTER_KEY_IDS[0]).append(": ").append(NEW_COLUMN_MASTER_KEYS[0]).append(", ")
+    .append(COLUMN_MASTER_KEY_IDS[1]).append(": ").append(NEW_COLUMN_MASTER_KEYS[1]).append(", ")
+    .append(COLUMN_MASTER_KEY_IDS[2]).append(": ").append(NEW_COLUMN_MASTER_KEYS[2]).append(", ")
+    .append(COLUMN_MASTER_KEY_IDS[3]).append(": ").append(NEW_COLUMN_MASTER_KEYS[3]).append(", ")
+    .append(COLUMN_MASTER_KEY_IDS[4]).append(": ").append(NEW_COLUMN_MASTER_KEYS[4]).append(", ")
+    .append(COLUMN_MASTER_KEY_IDS[5]).append(": ").append(NEW_COLUMN_MASTER_KEYS[5]).append(", ")
+    .append(FOOTER_MASTER_KEY_ID).append(": ").append(NEW_FOOTER_MASTER_KEY).toString();
+
+  private static final String COLUMN_KEY_MAPPING = new StringBuilder()
+    .append(COLUMN_MASTER_KEY_IDS[0]).append(": ").append(SingleRow.DOUBLE_FIELD_NAME).append("; ")
+    .append(COLUMN_MASTER_KEY_IDS[1]).append(": ").append(SingleRow.FLOAT_FIELD_NAME).append("; ")
+    .append(COLUMN_MASTER_KEY_IDS[2]).append(": ").append(SingleRow.BOOLEAN_FIELD_NAME).append("; ")
+    .append(COLUMN_MASTER_KEY_IDS[3]).append(": ").append(SingleRow.INT32_FIELD_NAME).append("; ")
+    .append(COLUMN_MASTER_KEY_IDS[4]).append(": ").append(SingleRow.BINARY_FIELD_NAME).append("; ")
+    .append(COLUMN_MASTER_KEY_IDS[5]).append(": ").append(SingleRow.FIXED_LENGTH_BINARY_FIELD_NAME)
+    .toString();
+
+  private static final int NUM_THREADS = 4;
+  private static final int WAIT_FOR_WRITE_TO_END_SECONDS = 5;
+  private static final int WAIT_FOR_READ_TO_END_SECONDS = 5;
+
+  private static final boolean plaintextFilesAllowed = true;
+
+  private static final int ROW_COUNT = 10000;
+  private static final List<SingleRow> DATA = Collections.unmodifiableList(SingleRow.generateRandomData(ROW_COUNT));
+
+  public enum EncryptionConfiguration {
+    ENCRYPT_COLUMNS_AND_FOOTER {
+      /**
+       * Encrypt two columns and the footer, with different keys.
+       */
+      public Configuration getHadoopConfiguration(TestPropertiesDrivenEncryption test) {
+        Configuration conf = getCryptoProperties(test);
+        setEncryptionKeys(conf);
+        return conf;
+      }
+    },
+    ENCRYPT_COLUMNS_PLAINTEXT_FOOTER {
+      /**
+       * Encrypt two columns, with different keys.
+       * Don't encrypt footer.
+       * (plaintext footer mode, readable by legacy readers)
+       */
+      public Configuration getHadoopConfiguration(TestPropertiesDrivenEncryption test) {
+        Configuration conf = getCryptoProperties(test);
+        setEncryptionKeys(conf);
+        conf.setBoolean(PropertiesDrivenCryptoFactory.PLAINTEXT_FOOTER_PROPERTY_NAME, true);
+        return conf;
+      }
+    },
+    ENCRYPT_COLUMNS_AND_FOOTER_CTR {
+      /**
+       * Encrypt two columns and the footer, with different keys.
+       * Use AES_GCM_CTR_V1 algorithm.
+       */
+      public Configuration getHadoopConfiguration(TestPropertiesDrivenEncryption test) {
+        Configuration conf = getCryptoProperties(test);
+        setEncryptionKeys(conf);
+        conf.set(PropertiesDrivenCryptoFactory.ENCRYPTION_ALGORITHM_PROPERTY_NAME,
+          ParquetCipher.AES_GCM_CTR_V1.toString());
+        return conf;
+      }
+    },
+    NO_ENCRYPTION {
+      /**
+       * Do not encrypt anything
+       */
+      public Configuration getHadoopConfiguration(TestPropertiesDrivenEncryption test) {
+        return null;
+      }
+    };
+
+    abstract public Configuration getHadoopConfiguration(TestPropertiesDrivenEncryption test);
+  }
+
+  public enum DecryptionConfiguration {
+    DECRYPT_WITH_KEY_RETRIEVER {
+      /**
+       * Decrypt using key retriever callback that holds the keys
+       * of two encrypted columns and the footer key.
+       */
+      public Configuration getHadoopConfiguration(TestPropertiesDrivenEncryption test) {
+        Configuration conf = getCryptoProperties(test);
+        return conf;
+      }
+    },
+    NO_DECRYPTION {
+      /**
+       * Do not decrypt anything.
+       */
+      public Configuration getHadoopConfiguration(TestPropertiesDrivenEncryption test) {
+        return null;
+      }
+    };
+
+    abstract public Configuration getHadoopConfiguration(TestPropertiesDrivenEncryption test);
+  }
+
+  /**
+   * Get Hadoop configuration with configuration properties common to all encryption modes
+   */
+  private static Configuration getCryptoProperties(TestPropertiesDrivenEncryption test) {
+    Configuration conf = new Configuration();
+    conf.set(EncryptionPropertiesFactory.CRYPTO_FACTORY_CLASS_PROPERTY_NAME,
+      PropertiesDrivenCryptoFactory.class.getName());
+
+    conf.set(KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME, InMemoryKMS.class.getName());
+    conf.set(InMemoryKMS.KEY_LIST_PROPERTY_NAME, KEY_LIST);
+    conf.set(InMemoryKMS.NEW_KEY_LIST_PROPERTY_NAME, NEW_KEY_LIST);
+
+    conf.setBoolean(KeyToolkit.KEY_MATERIAL_INTERNAL_PROPERTY_NAME, test.isKeyMaterialInternalStorage);
+    conf.setBoolean(KeyToolkit.DOUBLE_WRAPPING_PROPERTY_NAME, test.isDoubleWrapping);
+    conf.setBoolean(KeyToolkit.WRAP_LOCALLY_PROPERTY_NAME, test.isWrapLocally);
+    return conf;
+  }
+
+  /**
+   * Set configuration properties to encrypt columns and the footer with different keys
+   */
+  private static void setEncryptionKeys(Configuration conf) {
+    conf.set(PropertiesDrivenCryptoFactory.COLUMN_KEYS_PROPERTY_NAME, COLUMN_KEY_MAPPING);
+    conf.set(PropertiesDrivenCryptoFactory.FOOTER_KEY_PROPERTY_NAME, FOOTER_MASTER_KEY_ID);
+  }
+
+
+  @Test
+  public void testWriteReadEncryptedParquetFiles() throws IOException {
+    Path rootPath = new Path(temporaryFolder.getRoot().getPath());
+    LOG.info("======== testWriteReadEncryptedParquetFiles {} ========", rootPath.toString());
+    LOG.info("Run: isKeyMaterialInternalStorage={} isDoubleWrapping={} isWrapLocally={}",
+      isKeyMaterialInternalStorage, isDoubleWrapping, isWrapLocally);
+    KeyToolkit.removeCacheEntriesForAllTokens();
+    ExecutorService threadPool = Executors.newFixedThreadPool(NUM_THREADS);
+    try {
+      // Write using various encryption configurations.
+      testWriteEncryptedParquetFiles(rootPath, DATA, threadPool);
+      // Read using various decryption configurations.
+      testReadEncryptedParquetFiles(rootPath, DATA, threadPool);
+    } finally {
+      threadPool.shutdown();
+    }
+  }
+
+
+  private void testWriteEncryptedParquetFiles(Path root, List<SingleRow> data, ExecutorService threadPool) throws IOException {
+    EncryptionConfiguration[] encryptionConfigurations = EncryptionConfiguration.values();
+    for (EncryptionConfiguration encryptionConfiguration : encryptionConfigurations) {
+      Path encryptionConfigurationFolderPath = new Path(root, encryptionConfiguration.name());
+      Configuration conf = new Configuration();
+      FileSystem fs = encryptionConfigurationFolderPath.getFileSystem(conf);
+      if (fs.exists(encryptionConfigurationFolderPath)) {
+        fs.delete(encryptionConfigurationFolderPath, true);
+      }
+      fs.mkdirs(encryptionConfigurationFolderPath);
+
+      KeyToolkit.removeCacheEntriesForAllTokens();
+      CountDownLatch latch = new CountDownLatch(NUM_THREADS);
+      for (int i = 0; i < NUM_THREADS; ++i) {
+        final int threadNumber = i;
+        threadPool.execute(() -> {
+          writeEncryptedParquetFile(encryptionConfigurationFolderPath, data, encryptionConfiguration, threadNumber);
+          latch.countDown();
+        });
+      }
+      try {
+        latch.await(WAIT_FOR_WRITE_TO_END_SECONDS, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  private void writeEncryptedParquetFile(Path root, List<SingleRow> data, EncryptionConfiguration encryptionConfiguration,
+                                         int threadNumber) {
+    MessageType schema = SingleRow.getSchema();
+    SimpleGroupFactory f = new SimpleGroupFactory(schema);
+
+    int pageSize = data.size() / 10;     // Ensure that several pages will be created
+    int rowGroupSize = pageSize * 6 * 5; // Ensure that there are more row-groups created
+
+    Path file = new Path(root, getFileName(root, encryptionConfiguration, threadNumber));
+    LOG.info("\nWrite " + file.toString());
+    Configuration conf = encryptionConfiguration.getHadoopConfiguration(this);
+    FileEncryptionProperties fileEncryptionProperties = null;
+    try {
+      if (null == conf) {
+        conf = new Configuration();
+      } else {
+        EncryptionPropertiesFactory cryptoFactory = EncryptionPropertiesFactory.loadFactory(conf);
+        fileEncryptionProperties = cryptoFactory.getFileEncryptionProperties(conf, file, null);
+      }
+    } catch (Exception e) {
+      addErrorToErrorCollectorAndLog("Failed writing " + file.toString(), e,
+        encryptionConfiguration, null);
+      return;
+    }
+    try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(file)
+      .withConf(conf)
+      .withWriteMode(OVERWRITE)
+      .withType(schema)
+      .withPageSize(pageSize)
+      .withRowGroupSize(rowGroupSize)
+      .withEncryption(fileEncryptionProperties)
+      .build()) {
+
+      for (SingleRow singleRow : data) {
+        writer.write(
+          f.newGroup()
+            .append(SingleRow.BOOLEAN_FIELD_NAME, singleRow.boolean_field)
+            .append(SingleRow.INT32_FIELD_NAME, singleRow.int32_field)
+            .append(SingleRow.FLOAT_FIELD_NAME, singleRow.float_field)
+            .append(SingleRow.DOUBLE_FIELD_NAME, singleRow.double_field)
+            .append(SingleRow.BINARY_FIELD_NAME, Binary.fromConstantByteArray(singleRow.ba_field))
+            .append(SingleRow.FIXED_LENGTH_BINARY_FIELD_NAME, Binary.fromConstantByteArray(singleRow.flba_field))
+            .append(SingleRow.PLAINTEXT_INT32_FIELD_NAME, singleRow.plaintext_int32_field));
+
+      }
+    } catch (Exception e) {
+      addErrorToErrorCollectorAndLog("Failed writing " + file.toString(), e,
+        encryptionConfiguration, null);
+    }
+  }
+
+  private Path getFileName(Path root, EncryptionConfiguration encryptionConfiguration, int threadNumber) {
+    String suffix = (EncryptionConfiguration.NO_ENCRYPTION == encryptionConfiguration) ? ".parquet" : ".parquet.encrypted";
+    return new Path(root, encryptionConfiguration.toString() + "_" + threadNumber + suffix);
+  }
+
+  private void testReadEncryptedParquetFiles(Path root, List<SingleRow> data, ExecutorService threadPool) throws IOException {
+    readFilesMultithreaded(root, data, threadPool, false/*keysRotated*/);
+
+    LOG.info("--> Start master key rotation");
+    Configuration hadoopConfigForRotation =
+      EncryptionConfiguration.ENCRYPT_COLUMNS_AND_FOOTER.getHadoopConfiguration(this);
+    hadoopConfigForRotation.set(InMemoryKMS.NEW_KEY_LIST_PROPERTY_NAME, NEW_KEY_LIST);
+    InMemoryKMS.startKeyRotation(hadoopConfigForRotation);
+
+    EncryptionConfiguration[] encryptionConfigurations = EncryptionConfiguration.values();
+    for (EncryptionConfiguration encryptionConfiguration : encryptionConfigurations) {
+      if (EncryptionConfiguration.NO_ENCRYPTION == encryptionConfiguration) {
+        continue; // no rotation of plaintext files
+      }
+      Path encryptionConfigurationFolderPath = new Path(root, encryptionConfiguration.name());
+      try {
+        LOG.info("Rotate master keys in folder: "  + encryptionConfigurationFolderPath.toString());
+        KeyToolkit.rotateMasterKeys(encryptionConfigurationFolderPath.toString(), hadoopConfigForRotation);
+      } catch (UnsupportedOperationException e) {
+        if (isKeyMaterialInternalStorage || isWrapLocally) {
+          LOG.info("Key material file not found, as expected");
+        } else {
+          errorCollector.addError(e);
+        }
+        return; // No use in continuing reading if rotation wasn't successful
+      } catch (Exception e) {
+        errorCollector.addError(e);
+        return; // No use in continuing reading if rotation wasn't successful
+      }
+    }
+
+    InMemoryKMS.finishKeyRotation();
+    LOG.info("--> Finish master key rotation");
+
+    LOG.info("--> Read files again with new keys");
+    readFilesMultithreaded(root, data, threadPool, true /*keysRotated*/);
+  }
+
+  private void readFilesMultithreaded(Path root, List<SingleRow> data, ExecutorService threadPool, boolean keysRotated) {
+    DecryptionConfiguration[] decryptionConfigurations = DecryptionConfiguration.values();
+    for (DecryptionConfiguration decryptionConfiguration : decryptionConfigurations) {
+      LOG.info("\n\n");
+      LOG.info("==> Decryption configuration {}\n", decryptionConfiguration);
+      Configuration hadoopConfig = decryptionConfiguration.getHadoopConfiguration(this);
+      if (null != hadoopConfig) {
+        KeyToolkit.removeCacheEntriesForAllTokens();
+      }
+
+      EncryptionConfiguration[] encryptionConfigurations = EncryptionConfiguration.values();
+      for (EncryptionConfiguration encryptionConfiguration : encryptionConfigurations) {
+        Path encryptionConfigurationFolderPath = new Path(root, encryptionConfiguration.name());
+
+        CountDownLatch latch = new CountDownLatch(NUM_THREADS);
+        for (int i = 0; i < NUM_THREADS; ++i) {
+          final int threadNumber = i;
+          threadPool.execute(() -> {
+            Path file = getFileName(encryptionConfigurationFolderPath, encryptionConfiguration, threadNumber);
+            LOG.info("--> Read file {} {}", file.toString(), encryptionConfiguration);
+            readFileAndCheckResult(hadoopConfig, encryptionConfiguration, decryptionConfiguration,
+              data, file, keysRotated);
+
+            latch.countDown();
+          });
+        }
+        try {
+          latch.await(WAIT_FOR_READ_TO_END_SECONDS, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+  }
+
+  private void readFileAndCheckResult(Configuration hadoopConfig, EncryptionConfiguration encryptionConfiguration,
+                                      DecryptionConfiguration decryptionConfiguration,
+                                      List<SingleRow> data, Path file, boolean keysRotated) {
+    FileDecryptionProperties fileDecryptionProperties = null;
+    if (null == hadoopConfig) {
+      hadoopConfig = new Configuration();
+    } else {
+      DecryptionPropertiesFactory cryptoFactory = DecryptionPropertiesFactory.loadFactory(hadoopConfig);
+      fileDecryptionProperties = cryptoFactory.getFileDecryptionProperties(hadoopConfig, file);
+    }
+
+    // Set schema to only point to the non-encrypted columns
+    if ((decryptionConfiguration == DecryptionConfiguration.NO_DECRYPTION) &&
+      (encryptionConfiguration == EncryptionConfiguration.ENCRYPT_COLUMNS_PLAINTEXT_FOOTER)) {
+      hadoopConfig.set("parquet.read.schema", Types.buildMessage()
+        .optional(INT32).named(SingleRow.PLAINTEXT_INT32_FIELD_NAME)
+        .named("FormatTestObject").toString());
+    }
+    if ((encryptionConfiguration != EncryptionConfiguration.NO_ENCRYPTION) &&
+      (encryptionConfiguration != EncryptionConfiguration.ENCRYPT_COLUMNS_PLAINTEXT_FOOTER)) {
+      byte[] magic = new byte[MAGIC.length];
+      try (InputStream is = new FileInputStream(file.toString())) {
+        if (is.read(magic) != magic.length) {
+          throw new RuntimeException("ERROR");
+        }
+        if (!Arrays.equals(EFMAGIC, magic)) {
+          addErrorToErrorCollectorAndLog("File doesn't start with " + EF_MAGIC_STR, encryptionConfiguration, decryptionConfiguration);
+        }
+      } catch (IOException e) {
+        addErrorToErrorCollectorAndLog("Failed to read magic string at the beginning of file", e,
+          encryptionConfiguration, decryptionConfiguration);
+      }
+    }
+
+    if (keysRotated && (null != hadoopConfig.get(InMemoryKMS.KEY_LIST_PROPERTY_NAME))) {
+      hadoopConfig.set(InMemoryKMS.KEY_LIST_PROPERTY_NAME, NEW_KEY_LIST);
+    }
+
+    int rowNum = 0;
+    try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file)
+      .withConf(hadoopConfig)
+      .withDecryption(fileDecryptionProperties)
+      .build()) {
+      for (Group group = reader.read(); group != null; group = reader.read()) {
+        SingleRow rowExpected = data.get(rowNum++);
+
+        // plaintext columns
+        if (rowExpected.plaintext_int32_field != group.getInteger(SingleRow.PLAINTEXT_INT32_FIELD_NAME, 0)) {
+          addErrorToErrorCollectorAndLog("Wrong int", encryptionConfiguration, decryptionConfiguration);
+        }
+        // encrypted columns
+        if (decryptionConfiguration != DecryptionConfiguration.NO_DECRYPTION) {
+          if (rowExpected.boolean_field != group.getBoolean(SingleRow.BOOLEAN_FIELD_NAME, 0)) {
+            addErrorToErrorCollectorAndLog("Wrong bool", encryptionConfiguration, decryptionConfiguration);
+          }
+          if (rowExpected.int32_field != group.getInteger(SingleRow.INT32_FIELD_NAME, 0)) {
+            addErrorToErrorCollectorAndLog("Wrong int", encryptionConfiguration, decryptionConfiguration);
+          }
+          if (rowExpected.float_field != group.getFloat(SingleRow.FLOAT_FIELD_NAME, 0)) {
+            addErrorToErrorCollectorAndLog("Wrong float", encryptionConfiguration, decryptionConfiguration);
+          }
+          if (rowExpected.double_field != group.getDouble(SingleRow.DOUBLE_FIELD_NAME, 0)) {
+            addErrorToErrorCollectorAndLog("Wrong double", encryptionConfiguration, decryptionConfiguration);
+          }
+          if ((null != rowExpected.ba_field) &&
+            !Arrays.equals(rowExpected.ba_field, group.getBinary(SingleRow.BINARY_FIELD_NAME, 0).getBytes())) {
+            addErrorToErrorCollectorAndLog("Wrong byte array", encryptionConfiguration, decryptionConfiguration);
+          }
+          if (!Arrays.equals(rowExpected.flba_field,
+            group.getBinary(SingleRow.FIXED_LENGTH_BINARY_FIELD_NAME, 0).getBytes())) {
+            addErrorToErrorCollectorAndLog("Wrong fixed-length byte array",
+              encryptionConfiguration, decryptionConfiguration);
+          }
+        }
+      }
+    } catch (Exception e) {
+      checkResult(file.getName(), decryptionConfiguration, e);
+    }
+    hadoopConfig.unset("parquet.read.schema");
+  }
+
+  /**
+   * Check that the decryption result is as expected.
+   */
+  private void checkResult(String file, DecryptionConfiguration decryptionConfiguration, Exception exception) {
+    String errorMessage = exception.getMessage();
+    String exceptionMsg = (null == errorMessage ? exception.getClass().getName() : errorMessage);
+    // Extract encryptionConfigurationNumber from the parquet file name.
+    EncryptionConfiguration encryptionConfiguration = getEncryptionConfigurationFromFilename(file);
+
+    if (!plaintextFilesAllowed) {
+      // Encryption_configuration null encryptor, so parquet is plaintext.
+      // An exception is expected to be thrown if the file is being decrypted.
+      if (encryptionConfiguration == EncryptionConfiguration.NO_ENCRYPTION) {
+        if (decryptionConfiguration == DecryptionConfiguration.DECRYPT_WITH_KEY_RETRIEVER) {
+          if (!exceptionMsg.endsWith("Applying decryptor on plaintext file")) {
+            addErrorToErrorCollectorAndLog("Expecting exception Applying decryptor on plaintext file",
+              exceptionMsg, encryptionConfiguration, decryptionConfiguration);
+          } else {
+            LOG.info("Exception as expected: " + exceptionMsg);
+          }
+          return;
+        }
+      }
+    }
+    // Decryption configuration is null, so only plaintext file can be read. An exception is expected to
+    // be thrown if the file is encrypted.
+    if (decryptionConfiguration == DecryptionConfiguration.NO_DECRYPTION) {
+      if ((encryptionConfiguration != EncryptionConfiguration.NO_ENCRYPTION &&
+        encryptionConfiguration != EncryptionConfiguration.ENCRYPT_COLUMNS_PLAINTEXT_FOOTER)) {
+        if (!exceptionMsg.endsWith("No encryption key list") && !exceptionMsg.endsWith("No keys available")) {
+          addErrorToErrorCollectorAndLog("Expecting  No keys available exception", exceptionMsg,
+            encryptionConfiguration, decryptionConfiguration);
+        } else {
+          LOG.info("Exception as expected: " + exceptionMsg);
+        }
+        return;
+      }
+    }
+    exception.printStackTrace();
+    addErrorToErrorCollectorAndLog("Didn't expect an exception", exceptionMsg,
+        encryptionConfiguration, decryptionConfiguration);
+  }
+
+  private EncryptionConfiguration getEncryptionConfigurationFromFilename(String file) {
+    if (!file.endsWith(".parquet.encrypted")) {
+      return null;
+    }
+    String fileNamePrefix = file.replaceFirst("(.*)_[0-9]+.parquet.encrypted", "$1");;
+    try {
+      EncryptionConfiguration encryptionConfiguration = EncryptionConfiguration.valueOf(fileNamePrefix.toUpperCase());
+      return encryptionConfiguration;
+    } catch (IllegalArgumentException e) {
+      LOG.error("File name doesn't match any known encryption configuration: " + file);
+      synchronized (errorCollector) {
+        errorCollector.addError(e);
+      }
+      return null;
+    }
+  }
+
+  private void addErrorToErrorCollectorAndLog(String errorMessage, String exceptionMessage, EncryptionConfiguration encryptionConfiguration,
+                                              DecryptionConfiguration decryptionConfiguration) {
+    String fullErrorMessage = String.format("%s - %s Error: %s, but got [%s]",
+      encryptionConfiguration, decryptionConfiguration, errorMessage, exceptionMessage);
+
+    synchronized (errorCollector) {
+      errorCollector.addError(new Throwable(fullErrorMessage));
+    }
+    LOG.error(fullErrorMessage);
+  }
+
+  private void addErrorToErrorCollectorAndLog(String errorMessage, EncryptionConfiguration encryptionConfiguration,
+                                                     DecryptionConfiguration decryptionConfiguration) {
+    String fullErrorMessage = String.format("%s - %s Error: %s",
+      encryptionConfiguration, decryptionConfiguration, errorMessage);
+
+    synchronized (errorCollector) {
+      errorCollector.addError(new Throwable(fullErrorMessage));
+    }
+    LOG.error(fullErrorMessage);
+  }
+
+  private void addErrorToErrorCollectorAndLog(String errorMessage, Throwable exception,
+                                              EncryptionConfiguration encryptionConfiguration,
+                                              DecryptionConfiguration decryptionConfiguration) {
+    String errorMessageWithExceptionDetails = String.format("%s %s %s", errorMessage, exception.getClass().getName(),
+      exception.getMessage());
+    addErrorToErrorCollectorAndLog(errorMessageWithExceptionDetails,
+      encryptionConfiguration, decryptionConfiguration);
+    exception.printStackTrace();
+  }
+}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/keytools/mocks/InMemoryKMS.java b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/keytools/mocks/InMemoryKMS.java
new file mode 100644
index 0000000..1aa1b14
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/keytools/mocks/InMemoryKMS.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.crypto.keytools.mocks;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.crypto.KeyAccessDeniedException;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.crypto.keytools.RemoteKmsClient;
+import org.apache.parquet.crypto.keytools.KeyToolkit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a mock class, built for testing only. Don't use it as an example of KmsClient implementation.
+ * (VaultClient is the sample implementation).
+ */
+public class InMemoryKMS extends RemoteKmsClient {
+  private static final Logger LOG = LoggerFactory.getLogger(InMemoryKMS.class);
+
+  public static final String KEY_LIST_PROPERTY_NAME = "parquet.encryption.key.list";
+  public static final String NEW_KEY_LIST_PROPERTY_NAME = "parquet.encryption.new.key.list";
+
+  private static Map<String,byte[]> masterKeyMap;
+  private static Map<String,byte[]> newMasterKeyMap;
+
+  public static synchronized void startKeyRotation(Configuration hadoopConfiguration) {
+    String[] newMasterKeys = hadoopConfiguration.getTrimmedStrings(NEW_KEY_LIST_PROPERTY_NAME);
+    if (null == newMasterKeys || newMasterKeys.length == 0) {
+      throw new ParquetCryptoRuntimeException("No encryption key list");
+    }
+    newMasterKeyMap = parseKeyList(newMasterKeys);
+  }
+
+  public static synchronized void finishKeyRotation() {
+    masterKeyMap = newMasterKeyMap;
+  }
+
+  @Override
+  protected synchronized void initializeInternal() {
+    // Parse master  keys
+    String[] masterKeys = hadoopConfiguration.getTrimmedStrings(KEY_LIST_PROPERTY_NAME);
+    if (null == masterKeys || masterKeys.length == 0) {
+      throw new ParquetCryptoRuntimeException("No encryption key list");
+    }
+    masterKeyMap = parseKeyList(masterKeys);
+
+    newMasterKeyMap = masterKeyMap;
+  }
+
+  private static Map<String, byte[]> parseKeyList(String[] masterKeys) {
+    Map<String,byte[]> keyMap = new HashMap<>();
+
+    int nKeys = masterKeys.length;
+    for (int i=0; i < nKeys; i++) {
+      String[] parts = masterKeys[i].split(":");
+      String keyName = parts[0].trim();
+      if (parts.length != 2) {
+        throw new IllegalArgumentException("Key '" + keyName + "' is not formatted correctly");
+      }
+      String key = parts[1].trim();
+      try {
+        byte[] keyBytes = Base64.getDecoder().decode(key);
+        keyMap.put(keyName, keyBytes);
+      } catch (IllegalArgumentException e) {
+        LOG.warn("Could not decode key '" + keyName + "'!");
+        throw e;
+      }
+    }
+    return keyMap;
+  }
+
+  @Override
+  protected synchronized String wrapKeyInServer(byte[] keyBytes, String masterKeyIdentifier)
+      throws KeyAccessDeniedException, UnsupportedOperationException {
+
+    // Always use the latest key version for writing
+    byte[] masterKey = newMasterKeyMap.get(masterKeyIdentifier);
+    if (null == masterKey) {
+      throw new ParquetCryptoRuntimeException("Key not found: " + masterKeyIdentifier);
+    }
+    byte[] AAD = masterKeyIdentifier.getBytes(StandardCharsets.UTF_8);
+    return KeyToolkit.encryptKeyLocally(keyBytes, masterKey, AAD);
+  }
+
+  @Override
+  protected synchronized byte[] unwrapKeyInServer(String wrappedKey, String masterKeyIdentifier)
+      throws KeyAccessDeniedException, UnsupportedOperationException {
+    byte[] masterKey = masterKeyMap.get(masterKeyIdentifier);
+    if (null == masterKey) {
+      throw new ParquetCryptoRuntimeException("Key not found: " + masterKeyIdentifier);
+    }
+    byte[] AAD = masterKeyIdentifier.getBytes(StandardCharsets.UTF_8);
+    return KeyToolkit.decryptKeyLocally(wrappedKey, masterKey, AAD);
+  }
+
+  @Override
+  protected synchronized byte[] getMasterKeyFromServer(String masterKeyIdentifier)
+      throws KeyAccessDeniedException, UnsupportedOperationException {
+    // Always return the latest key version
+    return newMasterKeyMap.get(masterKeyIdentifier);
+  }
+}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/keytools/samples/VaultClient.java b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/keytools/samples/VaultClient.java
new file mode 100755
index 0000000..9d26cc1
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/keytools/samples/VaultClient.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.crypto.keytools.samples;
+
+import okhttp3.ConnectionSpec;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+
+import org.apache.parquet.crypto.KeyAccessDeniedException;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.crypto.keytools.KmsClient;
+import org.apache.parquet.crypto.keytools.RemoteKmsClient;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * An example of KmsClient implementation. Not for production use!
+ */
+public class VaultClient extends RemoteKmsClient {
+  private static final Logger LOG = LoggerFactory.getLogger(VaultClient.class);
+  private static final MediaType JSON_MEDIA_TYPE = MediaType.get("application/json; charset=utf-8");
+  private static final String DEFAULT_TRANSIT_ENGINE = "/v1/transit/";
+  private static final String transitWrapEndpoint = "encrypt/";
+  private static final String transitUnwrapEndpoint = "decrypt/";
+  private static final String tokenHeader="X-Vault-Token";
+  private static final ObjectMapper objectMapper = new ObjectMapper();
+
+  private String endPointPrefix;
+  private OkHttpClient httpClient = new OkHttpClient.Builder()
+    .connectionSpecs(Arrays.asList(ConnectionSpec.MODERN_TLS, ConnectionSpec.COMPATIBLE_TLS))
+    .build();
+
+  @Override
+  protected void initializeInternal() {
+    if (isDefaultToken) {
+      throw new ParquetCryptoRuntimeException("Vault token not provided");
+    }
+
+    if (kmsInstanceURL.equals(KmsClient.KMS_INSTANCE_URL_DEFAULT)) {
+      throw new ParquetCryptoRuntimeException("Vault URL not provided");
+    }
+
+    if (!kmsInstanceURL.endsWith("/")) {
+      kmsInstanceURL += "/";
+    }
+
+    String transitEngine = DEFAULT_TRANSIT_ENGINE;
+    if (!kmsInstanceID.equals(KmsClient.KMS_INSTANCE_ID_DEFAULT)) {
+      transitEngine = "/v1/" + kmsInstanceID;
+      if (!transitEngine.endsWith("/")) {
+        transitEngine += "/";
+      }
+    }
+
+    endPointPrefix = kmsInstanceURL + transitEngine;
+  }
+
+  @Override
+  public String wrapKeyInServer(byte[] dataKey, String masterKeyIdentifier) {
+    Map<String, String> writeKeyMap = new HashMap<String, String>(1);
+    final String dataKeyStr = Base64.getEncoder().encodeToString(dataKey);
+    writeKeyMap.put("plaintext", dataKeyStr);
+    String response = getContentFromTransitEngine(endPointPrefix + transitWrapEndpoint, buildPayload(writeKeyMap), masterKeyIdentifier);
+    String ciphertext = parseReturn(response, "ciphertext");
+    return ciphertext;
+  }
+
+  @Override
+  public byte[] unwrapKeyInServer(String wrappedKey, String masterKeyIdentifier) {
+    Map<String, String> writeKeyMap = new HashMap<String, String>(1);
+    writeKeyMap.put("ciphertext", wrappedKey);
+    String response = getContentFromTransitEngine(endPointPrefix + transitUnwrapEndpoint, buildPayload(writeKeyMap), masterKeyIdentifier);
+    String plaintext = parseReturn(response, "plaintext");
+    final byte[] key = Base64.getDecoder().decode(plaintext);
+    return key;
+  }
+
+  @Override
+  protected byte[] getMasterKeyFromServer(String masterKeyIdentifier) {
+    // Vault supports in-server wrapping and unwrapping. No need to fetch master keys.
+    throw new UnsupportedOperationException("Use server wrap/unwrap, instead of fetching master keys (local wrap)");
+  }
+
+  private String buildPayload(Map<String, String> paramMap) {
+    String jsonValue;
+    try {
+      jsonValue = objectMapper.writeValueAsString(paramMap);
+    } catch (IOException e) {
+      throw new ParquetCryptoRuntimeException("Failed to build payload", e);
+    }
+    return jsonValue;
+  }
+
+  private String getContentFromTransitEngine(String endPoint, String jPayload, String masterKeyIdentifier) {
+    LOG.info("masterKeyIdentifier: " + masterKeyIdentifier);
+    String masterKeyID = masterKeyIdentifier;
+
+    final RequestBody requestBody = RequestBody.create(JSON_MEDIA_TYPE, jPayload);
+    Request request = new Request.Builder()
+        .url(endPoint + masterKeyID)
+        .header(tokenHeader,  kmsToken)
+        .post(requestBody).build();
+
+    return executeAndGetResponse(endPoint, request);
+  }
+
+  private String executeAndGetResponse(String endPoint, Request request) {
+    Response response = null;
+    try {
+      response = httpClient.newCall(request).execute();
+      final String responseBody = response.body().string();
+      if (response.isSuccessful()) {
+        return responseBody;
+      } else {
+        if ((401 == response.code()) || (403 == response.code())) {
+          throw new KeyAccessDeniedException(responseBody);
+        }
+        throw new IOException("Vault call [" + endPoint + "] didn't succeed: " + responseBody);
+      }
+    } catch (IOException e) {
+      throw new ParquetCryptoRuntimeException("Vault call [" + request.url().toString() + endPoint + "] didn't succeed", e);
+    } finally {
+      if (null != response) {
+        response.close();
+      }
+    }
+  }
+
+
+  private static String parseReturn(String response, String searchKey) {
+    String matchingValue;
+    try {
+      matchingValue = objectMapper.readTree(response).findValue(searchKey).getTextValue();
+    } catch (IOException e) {
+      throw new ParquetCryptoRuntimeException("Failed to parse vault response. " + searchKey + " not found."  + response, e);
+    }
+
+    if(null == matchingValue) {
+      throw new ParquetCryptoRuntimeException("Failed to match vault response. " + searchKey + " not found."  + response);
+    }
+    return matchingValue;
+  }
+}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryptionOptions.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryptionOptions.java
index 7f0111d..12f7ff5 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryptionOptions.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryptionOptions.java
@@ -27,6 +27,7 @@
 import org.apache.parquet.crypto.FileEncryptionProperties;
 import org.apache.parquet.crypto.ParquetCipher;
 import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.crypto.SingleRow;
 import org.apache.parquet.example.data.Group;
 import org.apache.parquet.example.data.simple.SimpleGroupFactory;
 import org.apache.parquet.hadoop.example.ExampleParquetWriter;
@@ -34,9 +35,7 @@
 import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Types;
-import org.apache.parquet.statistics.RandomValues;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ErrorCollector;
@@ -46,7 +45,6 @@
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -55,14 +53,8 @@
 import java.util.Random;
 
 import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
-import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
-import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
-import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
-import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
-import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
-import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
 
 /*
  * This file contains samples for writing and reading encrypted Parquet files in different
@@ -134,19 +126,6 @@
   public ErrorCollector errorCollector = new ErrorCollector();
 
   private static String PARQUET_TESTING_PATH = "../submodules/parquet-testing/data";
-  private static final int RANDOM_SEED = 42;
-  private static final int FIXED_LENGTH = 10;
-  private static final Random RANDOM = new Random(RANDOM_SEED);
-  private static final RandomValues.IntGenerator intGenerator
-    = new RandomValues.IntGenerator(RANDOM_SEED);
-  private static final RandomValues.FloatGenerator floatGenerator
-    = new RandomValues.FloatGenerator(RANDOM_SEED);
-  private static final RandomValues.DoubleGenerator doubleGenerator
-    = new RandomValues.DoubleGenerator(RANDOM_SEED);
-  private static final RandomValues.BinaryGenerator binaryGenerator
-    = new RandomValues.BinaryGenerator(RANDOM_SEED);
-  private static final RandomValues.FixedGenerator fixedBinaryGenerator
-    = new RandomValues.FixedGenerator(RANDOM_SEED, FIXED_LENGTH);
 
   private static final byte[] FOOTER_ENCRYPTION_KEY = "0123456789012345".getBytes();
   private static final byte[][] COLUMN_ENCRYPTION_KEYS = { "1234567890123450".getBytes(),
@@ -155,30 +134,15 @@
   private static final String[] COLUMN_ENCRYPTION_KEY_IDS = { "kc1", "kc2", "kc3", "kc4", "kc5", "kc6"};
   private static final String FOOTER_ENCRYPTION_KEY_ID = "kf";
   private static final String AAD_PREFIX_STRING = "tester";
-  private static final String BOOLEAN_FIELD_NAME = "boolean_field";
-  private static final String INT32_FIELD_NAME = "int32_field";
-  private static final String FLOAT_FIELD_NAME = "float_field";
-  private static final String DOUBLE_FIELD_NAME = "double_field";
-  private static final String BINARY_FIELD_NAME = "ba_field";
-  private static final String FIXED_LENGTH_BINARY_FIELD_NAME = "flba_field";
-  private static final String PLAINTEXT_INT32_FIELD_NAME = "plain_int32_field";
 
   private static final byte[] footerKeyMetadata = FOOTER_ENCRYPTION_KEY_ID.getBytes(StandardCharsets.UTF_8);
   private static final byte[] AADPrefix = AAD_PREFIX_STRING.getBytes(StandardCharsets.UTF_8);
 
   private static final int ROW_COUNT = 10000;
-  private static final List<SingleRow> DATA = Collections.unmodifiableList(generateRandomData(ROW_COUNT));
-  private static final List<SingleRow> LINEAR_DATA = Collections.unmodifiableList(generateLinearData(250));
+  private static final List<SingleRow> DATA = Collections.unmodifiableList(SingleRow.generateRandomData(ROW_COUNT));
+  private static final List<SingleRow> LINEAR_DATA = Collections.unmodifiableList(SingleRow.generateLinearData(250));
 
-  private static final MessageType SCHEMA =
-    new MessageType("schema",
-      new PrimitiveType(REQUIRED, BOOLEAN, BOOLEAN_FIELD_NAME),
-      new PrimitiveType(REQUIRED, INT32, INT32_FIELD_NAME),
-      new PrimitiveType(REQUIRED, FLOAT, FLOAT_FIELD_NAME),
-      new PrimitiveType(REQUIRED, DOUBLE, DOUBLE_FIELD_NAME),
-      new PrimitiveType(OPTIONAL, BINARY, BINARY_FIELD_NAME),
-      Types.required(FIXED_LEN_BYTE_ARRAY).length(FIXED_LENGTH).named(FIXED_LENGTH_BINARY_FIELD_NAME),
-      new PrimitiveType(OPTIONAL, INT32, PLAINTEXT_INT32_FIELD_NAME));
+  private static final MessageType SCHEMA = SingleRow.getSchema();
 
   private static final DecryptionKeyRetrieverMock decryptionKeyRetrieverMock = new DecryptionKeyRetrieverMock()
     .putKey(FOOTER_ENCRYPTION_KEY_ID, FOOTER_ENCRYPTION_KEY)
@@ -344,71 +308,6 @@
     testInteropReadEncryptedParquetFiles(rootPath, true/*readOnlyEncrypted*/, LINEAR_DATA);
   }
 
-  private static List<SingleRow> generateRandomData(int rowCount) {
-    List<SingleRow> dataList = new ArrayList<>(rowCount);
-    for (int row = 0; row < rowCount; ++row) {
-      SingleRow newRow = new SingleRow(RANDOM.nextBoolean(),
-        intGenerator.nextValue(),  floatGenerator.nextValue(),
-        doubleGenerator.nextValue(), binaryGenerator.nextValue().getBytes(),
-        fixedBinaryGenerator.nextValue().getBytes(), intGenerator.nextValue());
-      dataList.add(newRow);
-    }
-    return dataList;
-  }
-
-  private static List<SingleRow> generateLinearData(int rowCount) {
-    List<SingleRow> dataList = new ArrayList<>(rowCount);
-    String baseStr = "parquet";
-    for (int row = 0; row < rowCount; ++row) {
-      boolean boolean_val = ((row % 2) == 0) ? true : false;
-      float float_val = (float) row * 1.1f;
-      double double_val = (row * 1.1111111);
-
-      byte[] binary_val = null;
-      if ((row % 2) == 0) {
-        char firstChar = (char) ((int) '0' + row / 100);
-        char secondChar = (char) ((int) '0' + (row / 10) % 10);
-        char thirdChar = (char) ((int) '0' + row % 10);
-        binary_val = (baseStr + firstChar + secondChar + thirdChar).getBytes(StandardCharsets.UTF_8);
-      }
-      char[] fixed = new char[FIXED_LENGTH];
-      char[] aChar = Character.toChars(row);
-      Arrays.fill(fixed, aChar[0]);
-
-      SingleRow newRow = new SingleRow(boolean_val,
-        row, float_val, double_val,
-        binary_val, new String(fixed).getBytes(StandardCharsets.UTF_8), null/*plaintext_int32_field*/);
-      dataList.add(newRow);
-    }
-    return dataList;
-  }
-
-  public static class SingleRow {
-    public final boolean boolean_field;
-    public final int int32_field;
-    public final float float_field;
-    public final double double_field;
-    public final byte[] ba_field;
-    public final byte[] flba_field;
-    public final Integer plaintext_int32_field; // Can be null, since it doesn't exist in C++-created files yet.
-
-    public SingleRow(boolean boolean_field,
-                     int int32_field,
-                     float float_field,
-                     double double_field,
-                     byte[] ba_field,
-                     byte[] flba_field,
-                     Integer plaintext_int32_field) {
-      this.boolean_field = boolean_field;
-      this.int32_field = int32_field;
-      this.float_field = float_field;
-      this.double_field = double_field;
-      this.ba_field = ba_field;
-      this.flba_field = flba_field;
-      this.plaintext_int32_field = plaintext_int32_field;
-    }
-  }
-
   private void testWriteEncryptedParquetFiles(Path root, List<SingleRow> data) throws IOException {
     Configuration conf = new Configuration();
 
@@ -434,13 +333,13 @@
         for (SingleRow singleRow : data) {
           writer.write(
             f.newGroup()
-              .append(BOOLEAN_FIELD_NAME, singleRow.boolean_field)
-              .append(INT32_FIELD_NAME, singleRow.int32_field)
-              .append(FLOAT_FIELD_NAME, singleRow.float_field)
-              .append(DOUBLE_FIELD_NAME, singleRow.double_field)
-              .append(BINARY_FIELD_NAME, Binary.fromConstantByteArray(singleRow.ba_field))
-              .append(FIXED_LENGTH_BINARY_FIELD_NAME, Binary.fromConstantByteArray(singleRow.flba_field))
-              .append(PLAINTEXT_INT32_FIELD_NAME, singleRow.plaintext_int32_field));
+              .append(SingleRow.BOOLEAN_FIELD_NAME, singleRow.boolean_field)
+              .append(SingleRow.INT32_FIELD_NAME, singleRow.int32_field)
+              .append(SingleRow.FLOAT_FIELD_NAME, singleRow.float_field)
+              .append(SingleRow.DOUBLE_FIELD_NAME, singleRow.double_field)
+              .append(SingleRow.BINARY_FIELD_NAME, Binary.fromConstantByteArray(singleRow.ba_field))
+              .append(SingleRow.FIXED_LENGTH_BINARY_FIELD_NAME, Binary.fromConstantByteArray(singleRow.flba_field))
+              .append(SingleRow.PLAINTEXT_INT32_FIELD_NAME, singleRow.plaintext_int32_field));
 
         }
       }
@@ -467,7 +366,7 @@
         if ((decryptionConfiguration == DecryptionConfiguration.NO_DECRYPTION) &&
           (encryptionConfiguration == EncryptionConfiguration.ENCRYPT_COLUMNS_PLAINTEXT_FOOTER)) {
           conf.set("parquet.read.schema", Types.buildMessage()
-            .optional(INT32).named(PLAINTEXT_INT32_FIELD_NAME)
+            .optional(INT32).named(SingleRow.PLAINTEXT_INT32_FIELD_NAME)
             .named("FormatTestObject").toString());
         }
 
@@ -479,38 +378,38 @@
           for (Group group = reader.read(); group != null; group = reader.read()) {
             SingleRow rowExpected = data.get(rowNum++);
             // plaintext columns
-            if (rowExpected.plaintext_int32_field != group.getInteger(PLAINTEXT_INT32_FIELD_NAME, 0)) {
+            if (rowExpected.plaintext_int32_field != group.getInteger(SingleRow.PLAINTEXT_INT32_FIELD_NAME, 0)) {
               addErrorToErrorCollectorAndLog("Wrong int", encryptionConfiguration, decryptionConfiguration);
             }
             // encrypted columns
             if (decryptionConfiguration != DecryptionConfiguration.NO_DECRYPTION) {
-              if (rowExpected.boolean_field != group.getBoolean(BOOLEAN_FIELD_NAME, 0)) {
+              if (rowExpected.boolean_field != group.getBoolean(SingleRow.BOOLEAN_FIELD_NAME, 0)) {
                 addErrorToErrorCollectorAndLog("Wrong bool", encryptionConfiguration, decryptionConfiguration);
               }
-              if (rowExpected.int32_field != group.getInteger(INT32_FIELD_NAME, 0)) {
+              if (rowExpected.int32_field != group.getInteger(SingleRow.INT32_FIELD_NAME, 0)) {
                 addErrorToErrorCollectorAndLog("Wrong int", encryptionConfiguration, decryptionConfiguration);
               }
-              if (rowExpected.float_field != group.getFloat(FLOAT_FIELD_NAME, 0)) {
+              if (rowExpected.float_field != group.getFloat(SingleRow.FLOAT_FIELD_NAME, 0)) {
                 addErrorToErrorCollectorAndLog("Wrong float", encryptionConfiguration, decryptionConfiguration);
               }
-              if (rowExpected.double_field != group.getDouble(DOUBLE_FIELD_NAME, 0)) {
+              if (rowExpected.double_field != group.getDouble(SingleRow.DOUBLE_FIELD_NAME, 0)) {
                 addErrorToErrorCollectorAndLog("Wrong double", encryptionConfiguration, decryptionConfiguration);
               }
               if ((null != rowExpected.ba_field) &&
-                !Arrays.equals(rowExpected.ba_field, group.getBinary(BINARY_FIELD_NAME, 0).getBytes())) {
+                !Arrays.equals(rowExpected.ba_field, group.getBinary(SingleRow.BINARY_FIELD_NAME, 0).getBytes())) {
                 addErrorToErrorCollectorAndLog("Wrong byte array", encryptionConfiguration, decryptionConfiguration);
               }
               if (!Arrays.equals(rowExpected.flba_field,
-                group.getBinary(FIXED_LENGTH_BINARY_FIELD_NAME, 0).getBytes())) {
+                group.getBinary(SingleRow.FIXED_LENGTH_BINARY_FIELD_NAME, 0).getBytes())) {
                 addErrorToErrorCollectorAndLog("Wrong fixed-length byte array",
                   encryptionConfiguration, decryptionConfiguration);
               }
             }
           }
         } catch (ParquetCryptoRuntimeException e) {
-          String errorMessage = e.getMessage();
-          checkResult(file.getName(), decryptionConfiguration, (null == errorMessage ? e.toString() : errorMessage));
+          checkResult(file.getName(), decryptionConfiguration, e);
         } catch (Exception e) {
+          e.printStackTrace();
           addErrorToErrorCollectorAndLog(
             "Unexpected exception: " + e.getClass().getName() + " with message: " + e.getMessage(),
             encryptionConfiguration, decryptionConfiguration);
@@ -539,8 +438,8 @@
         if ((decryptionConfiguration == DecryptionConfiguration.NO_DECRYPTION) &&
           (encryptionConfiguration == EncryptionConfiguration.ENCRYPT_COLUMNS_PLAINTEXT_FOOTER)) {
           conf.set("parquet.read.schema", Types.buildMessage()
-            .required(BOOLEAN).named(BOOLEAN_FIELD_NAME)
-            .required(INT32).named(INT32_FIELD_NAME)
+            .required(BOOLEAN).named(SingleRow.BOOLEAN_FIELD_NAME)
+            .required(INT32).named(SingleRow.INT32_FIELD_NAME)
             .named("FormatTestObject").toString());
         }
 
@@ -552,26 +451,26 @@
           for (Group group = reader.read(); group != null; group = reader.read()) {
             SingleRow rowExpected = data.get(rowNum++);
             // plaintext columns
-            if (rowExpected.boolean_field != group.getBoolean(BOOLEAN_FIELD_NAME, 0)) {
+            if (rowExpected.boolean_field != group.getBoolean(SingleRow.BOOLEAN_FIELD_NAME, 0)) {
               addErrorToErrorCollectorAndLog("Wrong bool", encryptionConfiguration, decryptionConfiguration);
             }
-            if (rowExpected.int32_field != group.getInteger(INT32_FIELD_NAME, 0)) {
+            if (rowExpected.int32_field != group.getInteger(SingleRow.INT32_FIELD_NAME, 0)) {
               addErrorToErrorCollectorAndLog("Wrong int", encryptionConfiguration, decryptionConfiguration);
             }
             // encrypted columns
             if (decryptionConfiguration != DecryptionConfiguration.NO_DECRYPTION) {
-              if (rowExpected.float_field != group.getFloat(FLOAT_FIELD_NAME, 0)) {
+              if (rowExpected.float_field != group.getFloat(SingleRow.FLOAT_FIELD_NAME, 0)) {
                 addErrorToErrorCollectorAndLog("Wrong float", encryptionConfiguration, decryptionConfiguration);
               }
-              if (rowExpected.double_field != group.getDouble(DOUBLE_FIELD_NAME, 0)) {
+              if (rowExpected.double_field != group.getDouble(SingleRow.DOUBLE_FIELD_NAME, 0)) {
                 addErrorToErrorCollectorAndLog("Wrong double", encryptionConfiguration, decryptionConfiguration);
               }
             }
           }
         } catch (ParquetCryptoRuntimeException e) {
-          String errorMessage = e.getMessage();
-          checkResult(file.getName(), decryptionConfiguration, (null == errorMessage ? e.toString() : errorMessage));
+          checkResult(file.getName(), decryptionConfiguration, e);
         } catch (Exception e) {
+          e.printStackTrace();
           addErrorToErrorCollectorAndLog(
             "Unexpected exception: " + e.getClass().getName() + " with message: " + e.getMessage(),
             encryptionConfiguration, decryptionConfiguration);
@@ -585,7 +484,10 @@
   /**
    * Check that the decryption result is as expected.
    */
-  private void checkResult(String file, DecryptionConfiguration decryptionConfiguration, String exceptionMsg) {
+  private void checkResult(String file, DecryptionConfiguration decryptionConfiguration,
+                           ParquetCryptoRuntimeException exception) {
+    String errorMessage = exception.getMessage();
+    String exceptionMsg = (null == errorMessage ? exception.toString() : errorMessage);
     // Extract encryptionConfigurationNumber from the parquet file name.
     EncryptionConfiguration encryptionConfiguration = getEncryptionConfigurationFromFilename(file);
 
@@ -648,10 +550,9 @@
         return;
       }
     }
-    if (null != exceptionMsg && !exceptionMsg.isEmpty()) {
-      addErrorToErrorCollectorAndLog("Didn't expect an exception", exceptionMsg,
-        encryptionConfiguration, decryptionConfiguration);
-    }
+    exception.printStackTrace();
+    addErrorToErrorCollectorAndLog("Didn't expect an exception", exceptionMsg,
+      encryptionConfiguration, decryptionConfiguration);
   }
 
   private EncryptionConfiguration getEncryptionConfigurationFromFilename(String file) {
@@ -691,42 +592,42 @@
     Map<ColumnPath, ColumnEncryptionProperties> columnPropertiesMap = new HashMap<>();
 
     ColumnEncryptionProperties columnPropertiesDouble = ColumnEncryptionProperties
-      .builder(DOUBLE_FIELD_NAME)
+      .builder(SingleRow.DOUBLE_FIELD_NAME)
       .withKey(COLUMN_ENCRYPTION_KEYS[0])
       .withKeyID(COLUMN_ENCRYPTION_KEY_IDS[0])
       .build();
     columnPropertiesMap.put(columnPropertiesDouble.getPath(), columnPropertiesDouble);
 
     ColumnEncryptionProperties columnPropertiesFloat = ColumnEncryptionProperties
-      .builder(FLOAT_FIELD_NAME)
+      .builder(SingleRow.FLOAT_FIELD_NAME)
       .withKey(COLUMN_ENCRYPTION_KEYS[1])
       .withKeyID(COLUMN_ENCRYPTION_KEY_IDS[1])
       .build();
     columnPropertiesMap.put(columnPropertiesFloat.getPath(), columnPropertiesFloat);
 
     ColumnEncryptionProperties columnPropertiesBool = ColumnEncryptionProperties
-      .builder(BOOLEAN_FIELD_NAME)
+      .builder(SingleRow.BOOLEAN_FIELD_NAME)
       .withKey(COLUMN_ENCRYPTION_KEYS[2])
       .withKeyID(COLUMN_ENCRYPTION_KEY_IDS[2])
       .build();
     columnPropertiesMap.put(columnPropertiesBool.getPath(), columnPropertiesBool);
 
     ColumnEncryptionProperties columnPropertiesInt32 = ColumnEncryptionProperties
-      .builder(INT32_FIELD_NAME)
+      .builder(SingleRow.INT32_FIELD_NAME)
       .withKey(COLUMN_ENCRYPTION_KEYS[3])
       .withKeyID(COLUMN_ENCRYPTION_KEY_IDS[3])
       .build();
     columnPropertiesMap.put(columnPropertiesInt32.getPath(), columnPropertiesInt32);
 
     ColumnEncryptionProperties columnPropertiesBinary = ColumnEncryptionProperties
-      .builder(BINARY_FIELD_NAME)
+      .builder(SingleRow.BINARY_FIELD_NAME)
       .withKey(COLUMN_ENCRYPTION_KEYS[4])
       .withKeyID(COLUMN_ENCRYPTION_KEY_IDS[4])
       .build();
     columnPropertiesMap.put(columnPropertiesBinary.getPath(), columnPropertiesBinary);
 
     ColumnEncryptionProperties columnPropertiesFixed = ColumnEncryptionProperties
-      .builder(FIXED_LENGTH_BINARY_FIELD_NAME)
+      .builder(SingleRow.FIXED_LENGTH_BINARY_FIELD_NAME)
       .withKey(COLUMN_ENCRYPTION_KEYS[5])
       .withKeyID(COLUMN_ENCRYPTION_KEY_IDS[5])
       .build();
@@ -739,37 +640,37 @@
     Map<ColumnPath, ColumnDecryptionProperties> columnMap = new HashMap<>();
 
     ColumnDecryptionProperties columnDecryptionPropsDouble = ColumnDecryptionProperties
-      .builder(DOUBLE_FIELD_NAME)
+      .builder(SingleRow.DOUBLE_FIELD_NAME)
       .withKey(COLUMN_ENCRYPTION_KEYS[0])
       .build();
     columnMap.put(columnDecryptionPropsDouble.getPath(), columnDecryptionPropsDouble);
 
     ColumnDecryptionProperties columnDecryptionPropsFloat = ColumnDecryptionProperties
-      .builder(FLOAT_FIELD_NAME)
+      .builder(SingleRow.FLOAT_FIELD_NAME)
       .withKey(COLUMN_ENCRYPTION_KEYS[1])
       .build();
     columnMap.put(columnDecryptionPropsFloat.getPath(), columnDecryptionPropsFloat);
 
     ColumnDecryptionProperties columnDecryptionPropsBool = ColumnDecryptionProperties
-      .builder(BOOLEAN_FIELD_NAME)
+      .builder(SingleRow.BOOLEAN_FIELD_NAME)
       .withKey(COLUMN_ENCRYPTION_KEYS[2])
       .build();
     columnMap.put(columnDecryptionPropsBool.getPath(), columnDecryptionPropsBool);
 
     ColumnDecryptionProperties columnDecryptionPropsInt32 = ColumnDecryptionProperties
-      .builder(INT32_FIELD_NAME)
+      .builder(SingleRow.INT32_FIELD_NAME)
       .withKey(COLUMN_ENCRYPTION_KEYS[3])
       .build();
     columnMap.put(columnDecryptionPropsInt32.getPath(), columnDecryptionPropsInt32);
 
     ColumnDecryptionProperties columnDecryptionPropsBinary = ColumnDecryptionProperties
-      .builder(BINARY_FIELD_NAME)
+      .builder(SingleRow.BINARY_FIELD_NAME)
       .withKey(COLUMN_ENCRYPTION_KEYS[4])
       .build();
     columnMap.put(columnDecryptionPropsBinary.getPath(), columnDecryptionPropsBinary);
 
     ColumnDecryptionProperties columnDecryptionPropsFixed = ColumnDecryptionProperties
-      .builder(FIXED_LENGTH_BINARY_FIELD_NAME)
+      .builder(SingleRow.FIXED_LENGTH_BINARY_FIELD_NAME)
       .withKey(COLUMN_ENCRYPTION_KEYS[5])
       .build();
     columnMap.put(columnDecryptionPropsFixed.getPath(), columnDecryptionPropsFixed);