| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| """A sample KmsClient implementation.""" |
| import argparse |
| import base64 |
| import os |
| |
| import requests |
| |
| import pyarrow as pa |
| import pyarrow.parquet as pq |
| import pyarrow.parquet.encryption as pe |
| |
| |
| class VaultClient(pe.KmsClient): |
| """An example of a KmsClient implementation with master keys |
| managed by Hashicorp Vault KMS. |
| See Vault documentation: https://www.vaultproject.io/api/secret/transit |
| Not for production use! |
| """ |
| JSON_MEDIA_TYPE = "application/json; charset=utf-8" |
| DEFAULT_TRANSIT_ENGINE = "/v1/transit/" |
| WRAP_ENDPOINT = "encrypt/" |
| UNWRAP_ENDPOINT = "decrypt/" |
| TOKEN_HEADER = "X-Vault-Token" |
| |
| def __init__(self, kms_connection_config): |
| """Create a VaultClient instance. |
| |
| Parameters |
| ---------- |
| kms_connection_config : KmsConnectionConfig |
| configuration parameters to connect to vault, |
| e.g. URL and access token |
| """ |
| pe.KmsClient.__init__(self) |
| self.kms_url = kms_connection_config.kms_instance_url + \ |
| VaultClient.DEFAULT_TRANSIT_ENGINE |
| self.kms_connection_config = kms_connection_config |
| |
| def wrap_key(self, key_bytes, master_key_identifier): |
| """Call Vault to wrap key key_bytes with key |
| identified by master_key_identifier.""" |
| endpoint = self.kms_url + VaultClient.WRAP_ENDPOINT |
| headers = {VaultClient.TOKEN_HEADER: |
| self.kms_connection_config.key_access_token} |
| r = requests.post(endpoint + master_key_identifier, |
| headers=headers, |
| data={'plaintext': base64.b64encode(key_bytes)}) |
| r.raise_for_status() |
| r_dict = r.json() |
| wrapped_key = r_dict['data']['ciphertext'] |
| return wrapped_key |
| |
| def unwrap_key(self, wrapped_key, master_key_identifier): |
| """Call Vault to unwrap wrapped_key with key |
| identified by master_key_identifier""" |
| endpoint = self.kms_url + VaultClient.UNWRAP_ENDPOINT |
| headers = {VaultClient.TOKEN_HEADER: |
| self.kms_connection_config.key_access_token} |
| r = requests.post(endpoint + master_key_identifier, |
| headers=headers, |
| data={'ciphertext': wrapped_key}) |
| r.raise_for_status() |
| r_dict = r.json() |
| plaintext = r_dict['data']['plaintext'] |
| key_bytes = base64.b64decode(plaintext) |
| return key_bytes |
| |
| |
| def parquet_write_read_with_vault(parquet_filename): |
| """An example for writing an encrypted parquet and reading an |
| encrypted parquet using master keys managed by Hashicorp Vault KMS. |
| Note that for this implementation requests dependency is needed |
| and environment properties VAULT_URL and VAULT_TOKEN should be set. |
| Please enable the transit engine. |
| """ |
| path = parquet_filename |
| |
| table = pa.Table.from_pydict({ |
| 'a': pa.array([1, 2, 3]), |
| 'b': pa.array(['a', 'b', 'c']), |
| 'c': pa.array(['x', 'y', 'z']) |
| }) |
| |
| # Encrypt the footer with the footer key, |
| # encrypt column `a` with one key |
| # and column `b` with another key, |
| # keep `c` plaintext |
| footer_key_name = "footer_key" |
| col_a_key_name = "col_a_key" |
| col_b_key_name = "col_b_key" |
| |
| encryption_config = pe.EncryptionConfiguration( |
| footer_key=footer_key_name, |
| column_keys={ |
| col_a_key_name: ["a"], |
| col_b_key_name: ["b"], |
| }) |
| |
| kms_connection_config = pe.KmsConnectionConfig( |
| kms_instance_url=os.environ.get('VAULT_URL', ''), |
| key_access_token=os.environ.get('VAULT_TOKEN', ''), |
| ) |
| |
| def kms_factory(kms_connection_configuration): |
| return VaultClient(kms_connection_configuration) |
| |
| # Write with encryption properties |
| crypto_factory = pe.CryptoFactory(kms_factory) |
| file_encryption_properties = crypto_factory.file_encryption_properties( |
| kms_connection_config, encryption_config) |
| with pq.ParquetWriter(path, |
| table.schema, |
| encryption_properties=file_encryption_properties) \ |
| as writer: |
| writer.write_table(table) |
| |
| # Read with decryption properties |
| file_decryption_properties = crypto_factory.file_decryption_properties( |
| kms_connection_config) |
| result = pq.ParquetFile( |
| path, decryption_properties=file_decryption_properties) |
| result_table = result.read() |
| assert table.equals(result_table) |
| |
| |
| def main(): |
| parser = argparse.ArgumentParser( |
| description="Write and read an encrypted parquet using master keys " |
| "managed by Hashicorp Vault.\nBefore using please enable the " |
| "transit engine in Vault and set VAULT_URL and VAULT_TOKEN " |
| "environment variables.") |
| parser.add_argument('--filename', dest='filename', type=str, |
| default='/tmp/encrypted_table.vault.parquet', |
| help='Filename of the parquet file to be created ' |
| '(default: /tmp/encrypted_table.vault.parquet') |
| args = parser.parse_args() |
| filename = args.filename |
| parquet_write_read_with_vault(filename) |
| |
| |
| if __name__ == '__main__': |
| main() |