blob: 1a9bf56c09b3529ba49b4866b85bcdc945b89d28 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! See `main.rs` for how to run it.
use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use arrow_schema::SchemaRef;
use async_trait::async_trait;
use base64::Engine;
use datafusion::common::extensions_options;
use datafusion::config::{EncryptionFactoryOptions, TableParquetOptions};
use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::ListingOptions;
use datafusion::error::Result;
use datafusion::execution::parquet_encryption::EncryptionFactory;
use datafusion::parquet::encryption::decrypt::KeyRetriever;
use datafusion::parquet::encryption::{
decrypt::FileDecryptionProperties, encrypt::FileEncryptionProperties,
};
use datafusion::prelude::SessionContext;
use futures::StreamExt;
use object_store::path::Path;
use rand::rand_core::{OsRng, TryRngCore};
use std::collections::HashSet;
use std::sync::Arc;
use tempfile::TempDir;
const ENCRYPTION_FACTORY_ID: &str = "example.mock_kms_encryption";
/// This example demonstrates reading and writing Parquet files that
/// are encrypted using Parquet Modular Encryption.
///
/// Compared to the `parquet_encrypted` example, where AES keys
/// are specified directly, this example implements an `EncryptionFactory` that
/// generates encryption keys dynamically per file.
/// Encryption key metadata is stored inline in the Parquet files and is used to determine
/// the decryption keys when reading the files.
///
/// In this example, encryption keys are simply stored base64 encoded in the Parquet metadata,
/// which is not a secure way to store encryption keys.
/// For production use, it is recommended to use a key-management service (KMS) to encrypt
/// data encryption keys.
pub async fn parquet_encrypted_with_kms() -> Result<()> {
let ctx = SessionContext::new();
// Register an `EncryptionFactory` implementation to be used for Parquet encryption
// in the runtime environment.
// `EncryptionFactory` instances are registered with a name to identify them so
// they can be later referenced in configuration options, and it's possible to register
// multiple different factories to handle different ways of encrypting Parquet.
let encryption_factory = TestEncryptionFactory::default();
ctx.runtime_env().register_parquet_encryption_factory(
ENCRYPTION_FACTORY_ID,
Arc::new(encryption_factory),
);
// Register some simple test data
let a: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c", "d"]));
let b: ArrayRef = Arc::new(Int32Array::from(vec![1, 10, 10, 100]));
let c: ArrayRef = Arc::new(Int32Array::from(vec![2, 20, 20, 200]));
let batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)])?;
ctx.register_batch("test_data", batch)?;
{
// Write and read encrypted Parquet with the programmatic API
let tmpdir = TempDir::new()?;
let table_path = format!("{}/", tmpdir.path().to_str().unwrap());
write_encrypted(&ctx, &table_path).await?;
read_encrypted(&ctx, &table_path).await?;
}
{
// Write and read encrypted Parquet with the SQL API
let tmpdir = TempDir::new()?;
let table_path = format!("{}/", tmpdir.path().to_str().unwrap());
write_encrypted_with_sql(&ctx, &table_path).await?;
read_encrypted_with_sql(&ctx, &table_path).await?;
}
Ok(())
}
/// Write an encrypted Parquet file
async fn write_encrypted(ctx: &SessionContext, table_path: &str) -> Result<()> {
let df = ctx.table("test_data").await?;
let mut parquet_options = TableParquetOptions::new();
// We specify that we want to use Parquet encryption by setting the identifier of the
// encryption factory to use and providing the factory-specific configuration.
// Our encryption factory only requires specifying the columns to encrypt.
let encryption_config = EncryptionConfig {
encrypted_columns: "b,c".to_owned(),
};
parquet_options
.crypto
.configure_factory(ENCRYPTION_FACTORY_ID, &encryption_config);
df.write_parquet(
table_path,
DataFrameWriteOptions::new(),
Some(parquet_options),
)
.await?;
println!("Encrypted Parquet written to {table_path}");
Ok(())
}
/// Read from an encrypted Parquet file
async fn read_encrypted(ctx: &SessionContext, table_path: &str) -> Result<()> {
let mut parquet_options = TableParquetOptions::new();
// Specify the encryption factory to use for decrypting Parquet.
// In this example, we don't require any additional configuration options when reading
// as we only need the key metadata from the Parquet files to determine the decryption keys.
parquet_options
.crypto
.configure_factory(ENCRYPTION_FACTORY_ID, &EncryptionConfig::default());
let file_format = ParquetFormat::default().with_options(parquet_options);
let listing_options = ListingOptions::new(Arc::new(file_format));
ctx.register_listing_table(
"encrypted_parquet_table",
&table_path,
listing_options.clone(),
None,
None,
)
.await?;
let mut batch_stream = ctx
.table("encrypted_parquet_table")
.await?
.execute_stream()
.await?;
println!("Reading encrypted Parquet as a RecordBatch stream");
while let Some(batch) = batch_stream.next().await {
let batch = batch?;
println!("Read batch with {} rows", batch.num_rows());
}
println!("Finished reading");
Ok(())
}
/// Write an encrypted Parquet file using only SQL syntax with string configuration
async fn write_encrypted_with_sql(ctx: &SessionContext, table_path: &str) -> Result<()> {
let query = format!(
"COPY test_data \
TO '{table_path}' \
STORED AS parquet
OPTIONS (\
'format.crypto.factory_id' '{ENCRYPTION_FACTORY_ID}', \
'format.crypto.factory_options.encrypted_columns' 'b,c' \
)"
);
let _ = ctx.sql(&query).await?.collect().await?;
println!("Encrypted Parquet written to {table_path}");
Ok(())
}
/// Read from an encrypted Parquet file using only the SQL API and string-based configuration
async fn read_encrypted_with_sql(ctx: &SessionContext, table_path: &str) -> Result<()> {
let ddl = format!(
"CREATE EXTERNAL TABLE encrypted_parquet_table_2 \
STORED AS PARQUET LOCATION '{table_path}' OPTIONS (\
'format.crypto.factory_id' '{ENCRYPTION_FACTORY_ID}' \
)"
);
ctx.sql(&ddl).await?;
let df = ctx.sql("SELECT * FROM encrypted_parquet_table_2").await?;
let mut batch_stream = df.execute_stream().await?;
println!("Reading encrypted Parquet as a RecordBatch stream");
while let Some(batch) = batch_stream.next().await {
let batch = batch?;
println!("Read batch with {} rows", batch.num_rows());
}
println!("Finished reading");
Ok(())
}
// Options used to configure our example encryption factory
extensions_options! {
struct EncryptionConfig {
/// Comma-separated list of columns to encrypt
pub encrypted_columns: String, default = "".to_owned()
}
}
/// Mock implementation of an `EncryptionFactory` that stores encryption keys
/// base64 encoded in the Parquet encryption metadata.
/// For production use, integrating with a key-management service to encrypt
/// data encryption keys is recommended.
#[derive(Default, Debug)]
struct TestEncryptionFactory {}
/// `EncryptionFactory` is a DataFusion trait for types that generate
/// file encryption and decryption properties.
#[async_trait]
impl EncryptionFactory for TestEncryptionFactory {
/// Generate file encryption properties to use when writing a Parquet file.
/// The `schema` is provided so that it may be used to dynamically configure
/// per-column encryption keys.
/// The file path is also available. We don't use the path in this example,
/// but other implementations may want to use this to compute an
/// AAD prefix for the file, or to allow use of external key material
/// (where key metadata is stored in a JSON file alongside Parquet files).
async fn get_file_encryption_properties(
&self,
options: &EncryptionFactoryOptions,
schema: &SchemaRef,
_file_path: &Path,
) -> Result<Option<Arc<FileEncryptionProperties>>> {
let config: EncryptionConfig = options.to_extension_options()?;
// Generate a random encryption key for this file.
let mut key = vec![0u8; 16];
OsRng.try_fill_bytes(&mut key).unwrap();
// Generate the key metadata that allows retrieving the key when reading the file.
let key_metadata = wrap_key(&key);
let mut builder = FileEncryptionProperties::builder(key.to_vec())
.with_footer_key_metadata(key_metadata.clone());
let encrypted_columns: HashSet<&str> =
config.encrypted_columns.split(",").collect();
if !encrypted_columns.is_empty() {
// Set up per-column encryption.
for field in schema.fields().iter() {
if encrypted_columns.contains(field.name().as_str()) {
// Here we re-use the same key for all encrypted columns,
// but new keys could also be generated per column.
builder = builder.with_column_key_and_metadata(
field.name().as_str(),
key.clone(),
key_metadata.clone(),
);
}
}
}
let encryption_properties = builder.build()?;
Ok(Some(encryption_properties))
}
/// Generate file decryption properties to use when reading a Parquet file.
/// Rather than provide the AES keys directly for decryption, we set a `KeyRetriever`
/// that can determine the keys using the encryption metadata.
async fn get_file_decryption_properties(
&self,
_options: &EncryptionFactoryOptions,
_file_path: &Path,
) -> Result<Option<Arc<FileDecryptionProperties>>> {
let decryption_properties =
FileDecryptionProperties::with_key_retriever(Arc::new(TestKeyRetriever {}))
.build()?;
Ok(Some(decryption_properties))
}
}
/// Mock implementation of encrypting a key that simply base64 encodes the key.
/// Note that this is not a secure way to store encryption keys,
/// and for production use keys should be encrypted with a KMS.
fn wrap_key(key: &[u8]) -> Vec<u8> {
base64::prelude::BASE64_STANDARD
.encode(key)
.as_bytes()
.to_vec()
}
struct TestKeyRetriever {}
impl KeyRetriever for TestKeyRetriever {
/// Get a data encryption key using the metadata stored in the Parquet file.
fn retrieve_key(
&self,
key_metadata: &[u8],
) -> datafusion::parquet::errors::Result<Vec<u8>> {
let key_metadata = std::str::from_utf8(key_metadata)?;
let key = base64::prelude::BASE64_STANDARD
.decode(key_metadata)
.unwrap();
Ok(key)
}
}