PARQUET-1939: Fix remote KMS client ambiguity (#841)

diff --git a/parquet-hadoop/README.md b/parquet-hadoop/README.md
index 87beeb9..2ad1e09 100644
--- a/parquet-hadoop/README.md
+++ b/parquet-hadoop/README.md
@@ -418,13 +418,6 @@
 
 ---
 
-**Property:** `parquet.encryption.wrap.locally`  
-**Description:** Wrap keys locally - master keys are fetched from the KMS server and used to encrypt other keys (DEKs or KEKs).
-If `false` - key wrapping will be performed by a KMS server.  
-**Default value:** `false`
-
----
-
 **Property:** `parquet.encryption.key.material.store.internally`  
 **Description:** Store key material inside Parquet file footers; this mode doesn’t produce additional files. 
 If `false`, key material is stored in separate new files, created in the same folder - this mode enables key rotation for immutable Parquet files.  
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/KeyToolkit.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/KeyToolkit.java
index 668d9db..ea8a026 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/KeyToolkit.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/KeyToolkit.java
@@ -69,11 +69,6 @@
    */
   public static final String CACHE_LIFETIME_PROPERTY_NAME = "parquet.encryption.cache.lifetime.seconds";
   /**
-   * Wrap keys locally - master keys are fetched from the KMS server and used to encrypt other keys (DEKs or KEKs).
-   * By default, false - key wrapping will be performed by a KMS server.
-   */
-  public static final String WRAP_LOCALLY_PROPERTY_NAME = "parquet.encryption.wrap.locally";
-  /**
    * Store key material inside Parquet file footers; this mode doesn’t produce additional files.
    * By default, true. If set to false, key material is stored in separate files in the same folder,
    * which enables key rotation for immutable Parquet files.
@@ -92,7 +87,6 @@
 
   public static final boolean DOUBLE_WRAPPING_DEFAULT = true;
   public static final long CACHE_LIFETIME_DEFAULT_SECONDS = 10 * 60; // 10 minutes
-  public static final boolean WRAP_LOCALLY_DEFAULT = false;
   public static final boolean KEY_MATERIAL_INTERNAL_DEFAULT = true;
   public static final int DATA_KEY_LENGTH_DEFAULT = 128;
   public static final int KEK_LENGTH_DEFAULT = 128;
@@ -216,10 +210,6 @@
       throw new UnsupportedOperationException("Key rotation is not supported for internal key material");
     }
 
-    if (hadoopConfig.getBoolean(WRAP_LOCALLY_PROPERTY_NAME, WRAP_LOCALLY_DEFAULT)) {
-      throw new UnsupportedOperationException("Key rotation is not supported for local key wrapping");
-    }
-
     // If process wrote files with double-wrapped keys, clean KEK cache (since master keys are changing).
     // Only once for each key rotation cycle; not for every folder
     long currentTime = System.currentTimeMillis();
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/RemoteKmsClient.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/LocalWrapKmsClient.java
similarity index 61%
rename from parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/RemoteKmsClient.java
rename to parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/LocalWrapKmsClient.java
index c8a7435..921a2db 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/RemoteKmsClient.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/LocalWrapKmsClient.java
@@ -33,18 +33,21 @@
 import java.util.concurrent.ConcurrentHashMap;

 import java.util.concurrent.ConcurrentMap;

 

-import static org.apache.parquet.crypto.keytools.KeyToolkit.stringIsEmpty;

-

-public abstract class RemoteKmsClient implements KmsClient {

+/**

+ * Typically, KMS systems support in-server key wrapping. Their clients should implement KmsClient interface directly.

+ * An extension of the LocalWrapKmsClient class should used only in rare situations where in-server wrapping is not

+ * supported. The wrapping will be done locally then - the MEKs will be fetched from the KMS server via the

+ * getMasterKeyFromServer function, and used to encrypt a DEK or KEK inside the LocalWrapKmsClient code.

+ * Note: master key rotation is not supported with local wrapping.

+ */

+public abstract class LocalWrapKmsClient implements KmsClient {

 

   public static final String LOCAL_WRAP_NO_KEY_VERSION = "NO_VERSION";

 

   protected String kmsInstanceID;

   protected String kmsInstanceURL;

   protected String kmsToken;

-  protected Boolean isWrapLocally;

   protected Configuration hadoopConfiguration;

-  protected boolean isDefaultToken;

 

   // MasterKey cache: master keys per key ID (per KMS Client). For local wrapping only.

   private ConcurrentMap<String, byte[]> masterKeyCache;

@@ -113,102 +116,52 @@
   public void initialize(Configuration configuration, String kmsInstanceID, String kmsInstanceURL, String accessToken) {

     this.kmsInstanceID = kmsInstanceID;

     this.kmsInstanceURL = kmsInstanceURL;

-

-    this.isWrapLocally = configuration.getBoolean(KeyToolkit.WRAP_LOCALLY_PROPERTY_NAME, KeyToolkit.WRAP_LOCALLY_DEFAULT);

-    if (isWrapLocally) {

-      masterKeyCache = new ConcurrentHashMap<>();

-    }

-

+    

+    masterKeyCache = new ConcurrentHashMap<>();

     hadoopConfiguration = configuration;

     kmsToken = accessToken;

 

-    isDefaultToken = kmsToken.equals(KmsClient.KEY_ACCESS_TOKEN_DEFAULT);

-

     initializeInternal();

   }

 

   @Override

   public String wrapKey(byte[] key, String masterKeyIdentifier) throws KeyAccessDeniedException {

-    if (isWrapLocally) {

-      byte[] masterKey =  masterKeyCache.computeIfAbsent(masterKeyIdentifier,

+    byte[] masterKey =  masterKeyCache.computeIfAbsent(masterKeyIdentifier,

           (k) -> getKeyFromServer(masterKeyIdentifier));

-      byte[] AAD = masterKeyIdentifier.getBytes(StandardCharsets.UTF_8);

-      String encryptedEncodedKey =  KeyToolkit.encryptKeyLocally(key, masterKey, AAD);

-      return LocalKeyWrap.createSerialized(encryptedEncodedKey);

-    } else {

-      refreshToken();

-      return wrapKeyInServer(key, masterKeyIdentifier);

-    }

+    byte[] AAD = masterKeyIdentifier.getBytes(StandardCharsets.UTF_8);

+    String encryptedEncodedKey =  KeyToolkit.encryptKeyLocally(key, masterKey, AAD);

+    return LocalKeyWrap.createSerialized(encryptedEncodedKey);

   }

 

   @Override

   public byte[] unwrapKey(String wrappedKey, String masterKeyIdentifier) throws KeyAccessDeniedException {

-    if (isWrapLocally) {

-      LocalKeyWrap keyWrap = LocalKeyWrap.parse(wrappedKey);

-      String masterKeyVersion = keyWrap.getMasterKeyVersion();

-      if (!LOCAL_WRAP_NO_KEY_VERSION.equals(masterKeyVersion)) {

-        throw new ParquetCryptoRuntimeException("Master key versions are not supported for local wrapping: "

-          + masterKeyVersion);

-      }

-      String encryptedEncodedKey = keyWrap.getEncryptedKey();

-      byte[] masterKey = masterKeyCache.computeIfAbsent(masterKeyIdentifier,

-          (k) -> getKeyFromServer(masterKeyIdentifier));

-      byte[] AAD = masterKeyIdentifier.getBytes(StandardCharsets.UTF_8);

-      return KeyToolkit.decryptKeyLocally(encryptedEncodedKey, masterKey, AAD);

-    } else {

-      refreshToken();

-      return unwrapKeyInServer(wrappedKey, masterKeyIdentifier);

+    LocalKeyWrap keyWrap = LocalKeyWrap.parse(wrappedKey);

+    String masterKeyVersion = keyWrap.getMasterKeyVersion();

+    if (!LOCAL_WRAP_NO_KEY_VERSION.equals(masterKeyVersion)) {

+      throw new ParquetCryptoRuntimeException("Master key versions are not supported for local wrapping: "

+        + masterKeyVersion);

     }

-  }

-

-  private void refreshToken() {

-    if (isDefaultToken) {

-      return;

-    }

-    kmsToken = hadoopConfiguration.getTrimmed(KeyToolkit.KEY_ACCESS_TOKEN_PROPERTY_NAME);

-    if (stringIsEmpty(kmsToken)) {

-      throw new ParquetCryptoRuntimeException("Empty token");

-    }

+    String encryptedEncodedKey = keyWrap.getEncryptedKey();

+    byte[] masterKey = masterKeyCache.computeIfAbsent(masterKeyIdentifier,

+        (k) -> getKeyFromServer(masterKeyIdentifier));

+    byte[] AAD = masterKeyIdentifier.getBytes(StandardCharsets.UTF_8);

+    return KeyToolkit.decryptKeyLocally(encryptedEncodedKey, masterKey, AAD);

   }

 

   private byte[] getKeyFromServer(String keyIdentifier) {

-    refreshToken();

-    return getMasterKeyFromServer(keyIdentifier);

+    // refresh token

+    kmsToken = hadoopConfiguration.getTrimmed(KeyToolkit.KEY_ACCESS_TOKEN_PROPERTY_NAME);

+    byte[] key = getMasterKeyFromServer(keyIdentifier);

+    int keyLength = key.length;

+    if (!(16 == keyLength || 24 == keyLength || 32 == keyLength)) {

+      throw new ParquetCryptoRuntimeException( "Wrong length: "+ keyLength +

+          " of AES key: "  + keyIdentifier);

+    }

+    return key;

   }

 

   /**

-   * Wrap a key with the master key in the remote KMS server.

-   * 

-   * If your KMS client code throws runtime exceptions related to access/permission problems

-   * (such as Hadoop AccessControlException), catch them and throw the KeyAccessDeniedException.

-   * 

-   * @param keyBytes: key bytes to be wrapped

-   * @param masterKeyIdentifier: a string that uniquely identifies the master key in a KMS instance

-   * @return wrappedKey: Encrypts key bytes with the master key, encodes the result  and potentially adds a KMS-specific metadata.

-   * @throws KeyAccessDeniedException unauthorized to encrypt with the given master key

-   * @throws UnsupportedOperationException KMS does not support in-server wrapping 

-   */

-  protected abstract String wrapKeyInServer(byte[] keyBytes, String masterKeyIdentifier) 

-      throws KeyAccessDeniedException, UnsupportedOperationException;

-

-  /**

-   * Unwrap a key with the master key in the remote KMS server. 

-   * 

-   * If your KMS client code throws runtime exceptions related to access/permission problems

-   * (such as Hadoop AccessControlException), catch them and throw the KeyAccessDeniedException.

-   * 

-   * @param wrappedKey String produced by wrapKey operation

-   * @param masterKeyIdentifier: a string that uniquely identifies the master key in a KMS instance

-   * @return key bytes

-   * @throws KeyAccessDeniedException unauthorized to unwrap with the given master key

-   * @throws UnsupportedOperationException KMS does not support in-server unwrapping 

-   */

-  protected abstract byte[] unwrapKeyInServer(String wrappedKey, String masterKeyIdentifier) 

-      throws KeyAccessDeniedException, UnsupportedOperationException;

-

-  /**

    * Get master key from the remote KMS server.

-   * Required only for local wrapping. No need to implement if KMS supports in-server wrapping/unwrapping.

    * 

    * If your KMS client code throws runtime exceptions related to access/permission problems

    * (such as Hadoop AccessControlException), catch them and throw the KeyAccessDeniedException.

@@ -216,14 +169,13 @@
    * @param masterKeyIdentifier: a string that uniquely identifies the master key in a KMS instance

    * @return master key bytes

    * @throws KeyAccessDeniedException unauthorized to get the master key

-   * @throws UnsupportedOperationException If not implemented, or KMS does not support key fetching 

    */

   protected abstract byte[] getMasterKeyFromServer(String masterKeyIdentifier) 

-      throws KeyAccessDeniedException, UnsupportedOperationException;

+      throws KeyAccessDeniedException;

 

   /**

    * Pass configuration with KMS-specific parameters.

    */

   protected abstract void initializeInternal() 

       throws KeyAccessDeniedException;

-}
\ No newline at end of file
+}

diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/TestPropertiesDrivenEncryption.java b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/TestPropertiesDrivenEncryption.java
index a9c078b..dc9b005 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/TestPropertiesDrivenEncryption.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/TestPropertiesDrivenEncryption.java
@@ -24,6 +24,7 @@
 import org.apache.parquet.crypto.keytools.KeyToolkit;
 import org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory;
 import org.apache.parquet.crypto.keytools.mocks.InMemoryKMS;
+import org.apache.parquet.crypto.keytools.mocks.LocalWrapInMemoryKMS;
 import org.apache.parquet.example.data.Group;
 import org.apache.parquet.example.data.simple.SimpleGroupFactory;
 import org.apache.parquet.hadoop.ParquetReader;
@@ -277,13 +278,16 @@
     conf.set(EncryptionPropertiesFactory.CRYPTO_FACTORY_CLASS_PROPERTY_NAME,
       PropertiesDrivenCryptoFactory.class.getName());
 
-    conf.set(KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME, InMemoryKMS.class.getName());
+    if (test.isWrapLocally) {
+      conf.set(KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME, LocalWrapInMemoryKMS.class.getName());
+    } else {
+      conf.set(KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME, InMemoryKMS.class.getName());
+    }
     conf.set(InMemoryKMS.KEY_LIST_PROPERTY_NAME, KEY_LIST);
     conf.set(InMemoryKMS.NEW_KEY_LIST_PROPERTY_NAME, NEW_KEY_LIST);
 
     conf.setBoolean(KeyToolkit.KEY_MATERIAL_INTERNAL_PROPERTY_NAME, test.isKeyMaterialInternalStorage);
     conf.setBoolean(KeyToolkit.DOUBLE_WRAPPING_PROPERTY_NAME, test.isDoubleWrapping);
-    conf.setBoolean(KeyToolkit.WRAP_LOCALLY_PROPERTY_NAME, test.isWrapLocally);
     return conf;
   }
 
@@ -402,6 +406,10 @@
   private void testReadEncryptedParquetFiles(Path root, List<SingleRow> data, ExecutorService threadPool) throws IOException {
     readFilesMultithreaded(root, data, threadPool, false/*keysRotated*/);
 
+    if (isWrapLocally) {
+      return; // key rotation is not supported with local key wrapping
+    }
+
     LOG.info("--> Start master key rotation");
     Configuration hadoopConfigForRotation =
       EncryptionConfiguration.ENCRYPT_COLUMNS_AND_FOOTER.getHadoopConfiguration(this);
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/keytools/mocks/InMemoryKMS.java b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/keytools/mocks/InMemoryKMS.java
index 1aa1b14..e6ec17d 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/keytools/mocks/InMemoryKMS.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/keytools/mocks/InMemoryKMS.java
@@ -26,8 +26,8 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.crypto.KeyAccessDeniedException;
 import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
-import org.apache.parquet.crypto.keytools.RemoteKmsClient;
 import org.apache.parquet.crypto.keytools.KeyToolkit;
+import org.apache.parquet.crypto.keytools.KmsClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,7 +35,7 @@
  * This is a mock class, built for testing only. Don't use it as an example of KmsClient implementation.
  * (VaultClient is the sample implementation).
  */
-public class InMemoryKMS extends RemoteKmsClient {
+public class InMemoryKMS implements KmsClient {
   private static final Logger LOG = LoggerFactory.getLogger(InMemoryKMS.class);
 
   public static final String KEY_LIST_PROPERTY_NAME = "parquet.encryption.key.list";
@@ -57,9 +57,9 @@
   }
 
   @Override
-  protected synchronized void initializeInternal() {
+  public synchronized void initialize(Configuration configuration, String kmsInstanceID, String kmsInstanceURL, String accessToken) {
     // Parse master  keys
-    String[] masterKeys = hadoopConfiguration.getTrimmedStrings(KEY_LIST_PROPERTY_NAME);
+    String[] masterKeys = configuration.getTrimmedStrings(KEY_LIST_PROPERTY_NAME);
     if (null == masterKeys || masterKeys.length == 0) {
       throw new ParquetCryptoRuntimeException("No encryption key list");
     }
@@ -91,7 +91,7 @@
   }
 
   @Override
-  protected synchronized String wrapKeyInServer(byte[] keyBytes, String masterKeyIdentifier)
+  public synchronized String wrapKey(byte[] keyBytes, String masterKeyIdentifier)
       throws KeyAccessDeniedException, UnsupportedOperationException {
 
     // Always use the latest key version for writing
@@ -104,7 +104,7 @@
   }
 
   @Override
-  protected synchronized byte[] unwrapKeyInServer(String wrappedKey, String masterKeyIdentifier)
+  public synchronized byte[] unwrapKey(String wrappedKey, String masterKeyIdentifier)
       throws KeyAccessDeniedException, UnsupportedOperationException {
     byte[] masterKey = masterKeyMap.get(masterKeyIdentifier);
     if (null == masterKey) {
@@ -113,11 +113,4 @@
     byte[] AAD = masterKeyIdentifier.getBytes(StandardCharsets.UTF_8);
     return KeyToolkit.decryptKeyLocally(wrappedKey, masterKey, AAD);
   }
-
-  @Override
-  protected synchronized byte[] getMasterKeyFromServer(String masterKeyIdentifier)
-      throws KeyAccessDeniedException, UnsupportedOperationException {
-    // Always return the latest key version
-    return newMasterKeyMap.get(masterKeyIdentifier);
-  }
 }
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/keytools/mocks/LocalWrapInMemoryKMS.java b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/keytools/mocks/LocalWrapInMemoryKMS.java
new file mode 100644
index 0000000..cdf27fd
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/keytools/mocks/LocalWrapInMemoryKMS.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.crypto.keytools.mocks;
+
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.parquet.crypto.KeyAccessDeniedException;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.crypto.keytools.LocalWrapKmsClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a mock class, built for testing only. Don't use it as an example of KmsClient implementation.
+ * (VaultClient is the sample implementation).
+ */
+public class LocalWrapInMemoryKMS extends LocalWrapKmsClient {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalWrapInMemoryKMS.class);
+
+  public static final String KEY_LIST_PROPERTY_NAME = "parquet.encryption.key.list";
+
+  private static Map<String,byte[]> masterKeyMap;
+
+  @Override
+  protected synchronized void initializeInternal() throws KeyAccessDeniedException {
+    // Parse master  keys
+    String[] masterKeys = hadoopConfiguration.getTrimmedStrings(KEY_LIST_PROPERTY_NAME);
+    if (null == masterKeys || masterKeys.length == 0) {
+      throw new ParquetCryptoRuntimeException("No encryption key list");
+    }
+    masterKeyMap = parseKeyList(masterKeys);
+  }
+
+  private static Map<String, byte[]> parseKeyList(String[] masterKeys) {
+    Map<String,byte[]> keyMap = new HashMap<>();
+
+    int nKeys = masterKeys.length;
+    for (int i=0; i < nKeys; i++) {
+      String[] parts = masterKeys[i].split(":");
+      String keyName = parts[0].trim();
+      if (parts.length != 2) {
+        throw new IllegalArgumentException("Key '" + keyName + "' is not formatted correctly");
+      }
+      String key = parts[1].trim();
+      try {
+        byte[] keyBytes = Base64.getDecoder().decode(key);
+        keyMap.put(keyName, keyBytes);
+      } catch (IllegalArgumentException e) {
+        LOG.warn("Could not decode key '" + keyName + "'!");
+        throw e;
+      }
+    }
+    return keyMap;
+  }
+
+  @Override
+  protected synchronized byte[] getMasterKeyFromServer(String masterKeyIdentifier)
+      throws KeyAccessDeniedException, UnsupportedOperationException {
+    return masterKeyMap.get(masterKeyIdentifier);
+  }
+}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/keytools/samples/VaultClient.java b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/keytools/samples/VaultClient.java
index 1e8507a..9eb9c51 100755
--- a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/keytools/samples/VaultClient.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/keytools/samples/VaultClient.java
@@ -25,10 +25,11 @@
 import okhttp3.RequestBody;
 import okhttp3.Response;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.crypto.KeyAccessDeniedException;
 import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.crypto.keytools.KeyToolkit;
 import org.apache.parquet.crypto.keytools.KmsClient;
-import org.apache.parquet.crypto.keytools.RemoteKmsClient;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,8 +43,9 @@
 /**
  * An example of KmsClient implementation. Not for production use!
  */
-public class VaultClient extends RemoteKmsClient {
+public class VaultClient implements KmsClient {
   private static final Logger LOG = LoggerFactory.getLogger(VaultClient.class);
+
   private static final MediaType JSON_MEDIA_TYPE = MediaType.get("application/json; charset=utf-8");
   private static final String DEFAULT_TRANSIT_ENGINE = "/v1/transit/";
   private static final String transitWrapEndpoint = "encrypt/";
@@ -51,16 +53,20 @@
   private static final String tokenHeader="X-Vault-Token";
   private static final ObjectMapper objectMapper = new ObjectMapper();
 
+  private String kmsToken;
+  private Configuration hadoopConfiguration;
+
   private String endPointPrefix;
   private OkHttpClient httpClient = new OkHttpClient.Builder()
     .connectionSpecs(Arrays.asList(ConnectionSpec.MODERN_TLS, ConnectionSpec.COMPATIBLE_TLS))
     .build();
 
   @Override
-  protected void initializeInternal() {
-    if (isDefaultToken) {
-      throw new ParquetCryptoRuntimeException("Vault token not provided");
-    }
+  public void initialize(Configuration configuration, String kmsInstanceID, String kmsInstanceURL, String accessToken) 
+      throws KeyAccessDeniedException {
+    hadoopConfiguration = configuration;
+    checkToken(accessToken);
+    kmsToken = accessToken;
 
     if (kmsInstanceURL.equals(KmsClient.KMS_INSTANCE_URL_DEFAULT)) {
       throw new ParquetCryptoRuntimeException("Vault URL not provided");
@@ -82,31 +88,31 @@
   }
 
   @Override
-  public String wrapKeyInServer(byte[] dataKey, String masterKeyIdentifier) {
+  public String wrapKey(byte[] keyBytes, String masterKeyIdentifier)
+      throws KeyAccessDeniedException {
+    refreshToken();
     Map<String, String> writeKeyMap = new HashMap<String, String>(1);
-    final String dataKeyStr = Base64.getEncoder().encodeToString(dataKey);
+    final String dataKeyStr = Base64.getEncoder().encodeToString(keyBytes);
     writeKeyMap.put("plaintext", dataKeyStr);
-    String response = getContentFromTransitEngine(endPointPrefix + transitWrapEndpoint, buildPayload(writeKeyMap), masterKeyIdentifier);
+    String response = getContentFromTransitEngine(endPointPrefix + transitWrapEndpoint, 
+        buildPayload(writeKeyMap), masterKeyIdentifier);
     String ciphertext = parseReturn(response, "ciphertext");
     return ciphertext;
   }
 
   @Override
-  public byte[] unwrapKeyInServer(String wrappedKey, String masterKeyIdentifier) {
+  public byte[] unwrapKey(String wrappedKey, String masterKeyIdentifier)
+      throws KeyAccessDeniedException {
+    refreshToken();
     Map<String, String> writeKeyMap = new HashMap<String, String>(1);
     writeKeyMap.put("ciphertext", wrappedKey);
-    String response = getContentFromTransitEngine(endPointPrefix + transitUnwrapEndpoint, buildPayload(writeKeyMap), masterKeyIdentifier);
+    String response = getContentFromTransitEngine(endPointPrefix + transitUnwrapEndpoint, 
+        buildPayload(writeKeyMap), masterKeyIdentifier);
     String plaintext = parseReturn(response, "plaintext");
     final byte[] key = Base64.getDecoder().decode(plaintext);
     return key;
   }
 
-  @Override
-  protected byte[] getMasterKeyFromServer(String masterKeyIdentifier) {
-    // Vault supports in-server wrapping and unwrapping. No need to fetch master keys.
-    throw new UnsupportedOperationException("Use server wrap/unwrap, instead of fetching master keys (local wrap)");
-  }
-
   private String buildPayload(Map<String, String> paramMap) {
     String jsonValue;
     try {
@@ -117,6 +123,17 @@
     return jsonValue;
   }
 
+  private void checkToken(String token) {
+    if (null == token || token.isEmpty() || token.equals(KmsClient.KEY_ACCESS_TOKEN_DEFAULT)) {
+      throw new ParquetCryptoRuntimeException("Wrong Vault token : " + token);
+    }
+  }
+
+  private void refreshToken() {
+    kmsToken = hadoopConfiguration.getTrimmed(KeyToolkit.KEY_ACCESS_TOKEN_PROPERTY_NAME);
+    checkToken(kmsToken);
+  }
+
   private String getContentFromTransitEngine(String endPoint, String jPayload, String masterKeyIdentifier) {
     LOG.info("masterKeyIdentifier: " + masterKeyIdentifier);
 
@@ -151,7 +168,6 @@
     }
   }
 
-
   private static String parseReturn(String response, String searchKey) {
     String matchingValue;
     try {