blob: 66cabaf9f85a35fcb4d83526eed3473fb3e9b981 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package parquet_test
import (
"encoding/binary"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"testing"
"github.com/apache/arrow/go/v10/parquet"
"github.com/apache/arrow/go/v10/parquet/compress"
"github.com/apache/arrow/go/v10/parquet/file"
"github.com/apache/arrow/go/v10/parquet/schema"
"github.com/stretchr/testify/suite"
)
/*
* This file contains unit-tests for writing encrypted Parquet files with
* different encryption configurations.
* The files are saved in temporary folder and will be deleted after reading
* them in encryption_read_config_test.go test.
*
* A detailed description of the Parquet Modular Encryption specification can be found
* here:
* https://github.com/apache/parquet-format/blob/encryption/Encryption.md
*
* Each unit-test creates a single parquet file with eight columns using one of the
* following encryption configurations:
*
* - Encryption configuration 1: Encrypt all columns and the footer with the same key.
* (uniform encryption)
* - Encryption configuration 2: Encrypt two columns and the footer, with different
* keys.
* - Encryption configuration 3: Encrypt two columns, with different keys.
* Don’t encrypt footer (to enable legacy readers)
* - plaintext footer mode.
* - Encryption configuration 4: Encrypt two columns and the footer, with different
* keys. Supply aad_prefix for file identity
* verification.
* - Encryption configuration 5: Encrypt two columns and the footer, with different
* keys. Supply aad_prefix, and call
* disable_aad_prefix_storage to prevent file
* identity storage in file metadata.
* - Encryption configuration 6: Encrypt two columns and the footer, with different
* keys. Use the alternative (AES_GCM_CTR_V1) algorithm.
*/
var (
tempdir string
)
type EncryptionConfigTestSuite struct {
suite.Suite
pathToDoubleField string
pathToFloatField string
fileName string
numRgs int
rowsPerRG int
schema *schema.GroupNode
footerEncryptionKey string
columnEncryptionKey1 string
columnEncryptionKey2 string
}
func (en *EncryptionConfigTestSuite) encryptFile(configs *parquet.FileEncryptionProperties, filename string) {
filename = filepath.Join(tempdir, filename)
props := parquet.NewWriterProperties(parquet.WithCompression(compress.Codecs.Snappy), parquet.WithEncryptionProperties(configs))
outFile, err := os.Create(filename)
en.Require().NoError(err)
en.Require().NotNil(outFile)
writer := file.NewParquetWriter(outFile, en.schema, file.WithWriterProps(props))
defer writer.Close()
for r := 0; r < en.numRgs; r++ {
var (
bufferedMode = r%2 == 0
rgr file.RowGroupWriter
colIndex = 0
)
if bufferedMode {
rgr = writer.AppendBufferedRowGroup()
} else {
rgr = writer.AppendRowGroup()
}
nextColumn := func() file.ColumnChunkWriter {
defer func() { colIndex++ }()
if bufferedMode {
cw, _ := rgr.(file.BufferedRowGroupWriter).Column(colIndex)
return cw
}
cw, _ := rgr.(file.SerialRowGroupWriter).NextColumn()
return cw
}
// write the bool col
boolWriter := nextColumn().(*file.BooleanColumnChunkWriter)
for i := 0; i < en.rowsPerRG; i++ {
value := (i % 2) == 0
boolWriter.WriteBatch([]bool{value}, nil, nil)
}
// write the int32 col
int32Writer := nextColumn().(*file.Int32ColumnChunkWriter)
for i := int32(0); i < int32(en.rowsPerRG); i++ {
int32Writer.WriteBatch([]int32{i}, nil, nil)
}
// write the int64 column, each row repeats twice
int64Writer := nextColumn().(*file.Int64ColumnChunkWriter)
for i := 0; i < 2*en.rowsPerRG; i++ {
var (
defLevel = [1]int16{1}
repLevel = [1]int16{0}
value int64 = int64(i) * 1000 * 1000 * 1000 * 1000
)
if i%2 == 0 {
repLevel[0] = 1
}
int64Writer.WriteBatch([]int64{value}, defLevel[:], repLevel[:])
}
// write the int96 col
int96Writer := nextColumn().(*file.Int96ColumnChunkWriter)
for i := 0; i < en.rowsPerRG; i++ {
val := parquet.Int96{}
binary.LittleEndian.PutUint32(val[:], uint32(i))
binary.LittleEndian.PutUint32(val[4:], uint32(i+1))
binary.LittleEndian.PutUint32(val[8:], uint32(i+2))
int96Writer.WriteBatch([]parquet.Int96{val}, nil, nil)
}
// write the float column
floatWriter := nextColumn().(*file.Float32ColumnChunkWriter)
for i := 0; i < en.rowsPerRG; i++ {
val := float32(i) * 1.1
floatWriter.WriteBatch([]float32{val}, nil, nil)
}
// write the double column
doubleWriter := nextColumn().(*file.Float64ColumnChunkWriter)
for i := 0; i < en.rowsPerRG; i++ {
value := float64(i) * 1.1111111
doubleWriter.WriteBatch([]float64{value}, nil, nil)
}
// write the bytearray column. make every alternate value NULL
baWriter := nextColumn().(*file.ByteArrayColumnChunkWriter)
for i := 0; i < en.rowsPerRG; i++ {
var (
hello = []byte{'p', 'a', 'r', 'q', 'u', 'e', 't', 0, 0, 0}
)
hello[7] = byte(int('0') + i/100)
hello[8] = byte(int('0') + (i/10)%10)
hello[9] = byte(int('0') + i%10)
if i%2 == 0 {
baWriter.WriteBatch([]parquet.ByteArray{hello}, []int16{1}, nil)
} else {
baWriter.WriteBatch([]parquet.ByteArray{nil}, []int16{0}, nil)
}
}
// write fixedlength byte array column
flbaWriter := nextColumn().(*file.FixedLenByteArrayColumnChunkWriter)
for i := 0; i < en.rowsPerRG; i++ {
v := byte(i)
value := parquet.FixedLenByteArray{v, v, v, v, v, v, v, v, v, v}
flbaWriter.WriteBatch([]parquet.FixedLenByteArray{value}, nil, nil)
}
}
}
func (en *EncryptionConfigTestSuite) SetupSuite() {
var err error
tempdir, err = ioutil.TempDir("", "parquet-encryption-test-*")
en.Require().NoError(err)
fmt.Println(tempdir)
en.fileName = FileName
en.rowsPerRG = 50
en.numRgs = 5
en.pathToDoubleField = "double_field"
en.pathToFloatField = "float_field"
en.footerEncryptionKey = FooterEncryptionKey
en.columnEncryptionKey1 = ColumnEncryptionKey1
en.columnEncryptionKey2 = ColumnEncryptionKey2
fields := make(schema.FieldList, 0)
// create a primitive node named "boolean_field" with type BOOLEAN
// repetition:REQUIRED
fields = append(fields, schema.NewBooleanNode("boolean_field", parquet.Repetitions.Required, -1))
// create a primitive node named "int32_field" with type INT32 repetition REQUIRED
// and logical type: TIME_MILLIS
f, _ := schema.NewPrimitiveNodeLogical("int32_field", parquet.Repetitions.Required,
schema.NewTimeLogicalType(true, schema.TimeUnitMillis), parquet.Types.Int32, 0, -1)
fields = append(fields, f)
// create a primitive node named "int64_field" with type int64, repetition:REPEATED
fields = append(fields, schema.NewInt64Node("int64_field", parquet.Repetitions.Repeated, -1))
fields = append(fields,
schema.NewInt96Node("int96_field", parquet.Repetitions.Required, -1),
schema.NewFloat32Node("float_field", parquet.Repetitions.Required, -1),
schema.NewFloat64Node("double_field", parquet.Repetitions.Required, -1))
// create a primitive node named ba_field with type:BYTE_ARRAY repetition:OPTIONAL
fields = append(fields, schema.NewByteArrayNode("ba_field", parquet.Repetitions.Optional, -1))
// create a primitive node for flba_field
fields = append(fields, schema.NewFixedLenByteArrayNode("flba_field", parquet.Repetitions.Required, 10, -1))
// flba_field fixedlenbytearray
en.schema, _ = schema.NewGroupNode("schema", parquet.Repetitions.Required, fields, -1)
}
// Encryption Config 1: Encrypt All columns and the footer with the same key
// (uniform encryption)
func (en *EncryptionConfigTestSuite) TestUniformEncryption() {
props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey, parquet.WithFooterKeyMetadata("kf"))
en.encryptFile(props, "tmp_uniform_encryption.parquet.encrypted")
}
// Encryption config 2: Encrypt Two Columns and the Footer, with different keys
func (en *EncryptionConfigTestSuite) TestEncryptTwoColumnsAndFooter() {
encryptCols := make(parquet.ColumnPathToEncryptionPropsMap)
encryptCols[en.pathToDoubleField] = parquet.NewColumnEncryptionProperties(en.pathToDoubleField, parquet.WithKey(en.columnEncryptionKey1), parquet.WithKeyID("kc1"))
encryptCols[en.pathToFloatField] = parquet.NewColumnEncryptionProperties(en.pathToFloatField, parquet.WithKey(en.columnEncryptionKey2), parquet.WithKeyID("kc2"))
props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey, parquet.WithFooterKeyMetadata("kf"), parquet.WithEncryptedColumns(encryptCols))
en.encryptFile(props, "tmp_encrypt_columns_and_footer.parquet.encrypted")
}
// Encryption Config 3: encrypt two columns, with different keys.
// plaintext footer
// (plaintext footer mode, readable by legacy readers)
func (en *EncryptionConfigTestSuite) TestEncryptTwoColumnsPlaintextFooter() {
encryptCols := make(parquet.ColumnPathToEncryptionPropsMap)
encryptCols[en.pathToDoubleField] = parquet.NewColumnEncryptionProperties(en.pathToDoubleField, parquet.WithKey(en.columnEncryptionKey1), parquet.WithKeyID("kc1"))
encryptCols[en.pathToFloatField] = parquet.NewColumnEncryptionProperties(en.pathToFloatField, parquet.WithKey(en.columnEncryptionKey2), parquet.WithKeyID("kc2"))
props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey, parquet.WithFooterKeyMetadata("kf"), parquet.WithEncryptedColumns(encryptCols), parquet.WithPlaintextFooter())
en.encryptFile(props, "tmp_encrypt_columns_plaintext_footer.parquet.encrypted")
}
// Encryption Config 4: Encrypt two columns and the footer, with different keys
// use aad_prefix
func (en *EncryptionConfigTestSuite) TestEncryptTwoColumnsAndFooterWithAadPrefix() {
encryptCols := make(parquet.ColumnPathToEncryptionPropsMap)
encryptCols[en.pathToDoubleField] = parquet.NewColumnEncryptionProperties(en.pathToDoubleField, parquet.WithKey(en.columnEncryptionKey1), parquet.WithKeyID("kc1"))
encryptCols[en.pathToFloatField] = parquet.NewColumnEncryptionProperties(en.pathToFloatField, parquet.WithKey(en.columnEncryptionKey2), parquet.WithKeyID("kc2"))
props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey, parquet.WithFooterKeyMetadata("kf"), parquet.WithEncryptedColumns(encryptCols), parquet.WithAadPrefix(en.fileName))
en.encryptFile(props, "tmp_encrypt_columns_and_footer_aad.parquet.encrypted")
}
// Encryption Config 5: Encrypt Two columns and the footer, with different keys
// use aad_prefix and disable_aad_prefix_storage
func (en *EncryptionConfigTestSuite) TestEncryptTwoColumnsAndFooterWithAadPrefixDisableAadStorage() {
encryptCols := make(parquet.ColumnPathToEncryptionPropsMap)
encryptCols[en.pathToDoubleField] = parquet.NewColumnEncryptionProperties(en.pathToDoubleField, parquet.WithKey(en.columnEncryptionKey1), parquet.WithKeyID("kc1"))
encryptCols[en.pathToFloatField] = parquet.NewColumnEncryptionProperties(en.pathToFloatField, parquet.WithKey(en.columnEncryptionKey2), parquet.WithKeyID("kc2"))
props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey, parquet.WithFooterKeyMetadata("kf"), parquet.WithAadPrefix(en.fileName), parquet.DisableAadPrefixStorage())
en.encryptFile(props, "tmp_encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted")
}
// Encryption Config 6: Encrypt two columns and the footer, with different keys.
// Use AES_GCM_CTR_V1
func (en *EncryptionConfigTestSuite) TestEncryptTwoColumnsAndFooterAesGcmCtr() {
encryptCols := make(parquet.ColumnPathToEncryptionPropsMap)
encryptCols[en.pathToDoubleField] = parquet.NewColumnEncryptionProperties(en.pathToDoubleField, parquet.WithKey(en.columnEncryptionKey1), parquet.WithKeyID("kc1"))
encryptCols[en.pathToFloatField] = parquet.NewColumnEncryptionProperties(en.pathToFloatField, parquet.WithKey(en.columnEncryptionKey2), parquet.WithKeyID("kc2"))
props := parquet.NewFileEncryptionProperties(en.footerEncryptionKey, parquet.WithFooterKeyMetadata("kf"), parquet.WithEncryptedColumns(encryptCols), parquet.WithAlg(parquet.AesCtr))
en.encryptFile(props, "tmp_encrypt_columns_and_footer_ctr.parquet.encrypted")
}
func TestFileEncryption(t *testing.T) {
suite.Run(t, new(EncryptionConfigTestSuite))
}