| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.parquet.crypto; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.parquet.crypto.keytools.KeyToolkit; |
| import org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory; |
| import org.apache.parquet.crypto.keytools.mocks.InMemoryKMS; |
| import org.apache.parquet.crypto.keytools.mocks.LocalWrapInMemoryKMS; |
| import org.apache.parquet.example.data.Group; |
| import org.apache.parquet.example.data.simple.SimpleGroupFactory; |
| import org.apache.parquet.hadoop.ParquetReader; |
| import org.apache.parquet.hadoop.ParquetWriter; |
| import org.apache.parquet.hadoop.example.ExampleParquetWriter; |
| import org.apache.parquet.hadoop.example.GroupReadSupport; |
| import org.apache.parquet.io.api.Binary; |
| import org.apache.parquet.schema.MessageType; |
| import org.apache.parquet.schema.Types; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.ErrorCollector; |
| import org.junit.rules.TemporaryFolder; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.Parameterized; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.FileInputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Base64; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.TimeUnit; |
| |
| import static org.apache.parquet.hadoop.ParquetFileWriter.EFMAGIC; |
| import static org.apache.parquet.hadoop.ParquetFileWriter.EF_MAGIC_STR; |
| import static org.apache.parquet.hadoop.ParquetFileWriter.MAGIC; |
| import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE; |
| import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; |
| |
| /* |
| * This file contains samples for writing and reading encrypted Parquet files in different |
| * encryption and decryption configurations, set using a properties-driven interface. |
| * |
| * The write sample produces number of parquet files, each encrypted with a different |
| * encryption configuration as described below. |
| * The name of each file is in the form of: |
| * <encryption-configuration-name>.parquet.encrypted or |
| * NO_ENCRYPTION.parquet for plaintext file. |
| * |
| * The read sample creates a set of decryption configurations and then uses each of them |
| * to read all encrypted files in the input directory. |
| * |
| * The different encryption and decryption configurations are listed below. |
| * |
| * |
| * A detailed description of the Parquet Modular Encryption specification can be found |
| * here: |
| * https://github.com/apache/parquet-format/blob/encryption/Encryption.md |
| * |
| * The write sample creates files with eight columns in the following |
| * encryption configurations: |
| * |
| * - ENCRYPT_COLUMNS_AND_FOOTER: Encrypt two columns and the footer, with different |
| * keys. |
| * - ENCRYPT_COLUMNS_PLAINTEXT_FOOTER: Encrypt two columns, with different keys. |
| * Do not encrypt footer (to enable legacy readers) |
| * - plaintext footer mode. |
| * - ENCRYPT_COLUMNS_AND_FOOTER_CTR: Encrypt two columns and the footer, with different |
| * keys. Use the alternative (AES_GCM_CTR_V1) algorithm. |
| * - NO_ENCRYPTION: Do not encrypt anything |
| * |
| * |
| * |
| * The read sample uses each of the following decryption configurations to read every |
| * encrypted files in the input directory: |
| * |
| * - DECRYPT_WITH_KEY_RETRIEVER: Decrypt using key retriever that holds the keys of |
| * two encrypted columns and the footer key. |
| * - NO_DECRYPTION: Do not decrypt anything. |
| */ |
| @RunWith(Parameterized.class) |
| public class TestPropertiesDrivenEncryption { |
| @Parameterized.Parameters(name = "Run {index}: isKeyMaterialInternalStorage={0} isDoubleWrapping={1} isWrapLocally={2}") |
| public static Collection<Object[]> data() { |
| Collection<Object[]> list = new ArrayList<>(8); |
| boolean[] flagValues = { false, true }; |
| for (boolean keyMaterialInternalStorage : flagValues) { |
| for (boolean doubleWrapping : flagValues) { |
| for (boolean wrapLocally : flagValues) { |
| Object[] vector = {keyMaterialInternalStorage, doubleWrapping, wrapLocally}; |
| list.add(vector); |
| } |
| } |
| } |
| return list; |
| } |
| |
| @Parameterized.Parameter // first data value (0) is default |
| public boolean isKeyMaterialInternalStorage; |
| |
| @Parameterized.Parameter(value = 1) |
| public boolean isDoubleWrapping; |
| |
| @Parameterized.Parameter(value = 2) |
| public boolean isWrapLocally; |
| |
| private static final Logger LOG = LoggerFactory.getLogger(TestPropertiesDrivenEncryption.class); |
| |
| @Rule |
| public TemporaryFolder temporaryFolder = new TemporaryFolder(); |
| |
| @Rule |
| public ErrorCollector errorCollector = new ErrorCollector(); |
| |
| private static final Base64.Encoder encoder = Base64.getEncoder(); |
| private static final String FOOTER_MASTER_KEY = |
| encoder.encodeToString("0123456789012345".getBytes(StandardCharsets.UTF_8)); |
| private static final String[] COLUMN_MASTER_KEYS = { |
| encoder.encodeToString("1234567890123450".getBytes(StandardCharsets.UTF_8)), |
| encoder.encodeToString("1234567890123451".getBytes(StandardCharsets.UTF_8)), |
| encoder.encodeToString("1234567890123452".getBytes(StandardCharsets.UTF_8)), |
| encoder.encodeToString("1234567890123453".getBytes(StandardCharsets.UTF_8)), |
| encoder.encodeToString("1234567890123454".getBytes(StandardCharsets.UTF_8)), |
| encoder.encodeToString("1234567890123455".getBytes(StandardCharsets.UTF_8))}; |
| private static final String[] COLUMN_MASTER_KEY_IDS = { "kc1", "kc2", "kc3", "kc4", "kc5", "kc6"}; |
| private static final String FOOTER_MASTER_KEY_ID = "kf"; |
| |
| private static final String KEY_LIST = new StringBuilder() |
| .append(COLUMN_MASTER_KEY_IDS[0]).append(": ").append(COLUMN_MASTER_KEYS[0]).append(", ") |
| .append(COLUMN_MASTER_KEY_IDS[1]).append(": ").append(COLUMN_MASTER_KEYS[1]).append(", ") |
| .append(COLUMN_MASTER_KEY_IDS[2]).append(": ").append(COLUMN_MASTER_KEYS[2]).append(", ") |
| .append(COLUMN_MASTER_KEY_IDS[3]).append(": ").append(COLUMN_MASTER_KEYS[3]).append(", ") |
| .append(COLUMN_MASTER_KEY_IDS[4]).append(": ").append(COLUMN_MASTER_KEYS[4]).append(", ") |
| .append(COLUMN_MASTER_KEY_IDS[5]).append(": ").append(COLUMN_MASTER_KEYS[5]).append(", ") |
| .append(FOOTER_MASTER_KEY_ID).append(": ").append(FOOTER_MASTER_KEY).toString(); |
| |
| private static final String NEW_FOOTER_MASTER_KEY = |
| encoder.encodeToString("9123456789012345".getBytes(StandardCharsets.UTF_8)); |
| private static final String[] NEW_COLUMN_MASTER_KEYS = { |
| encoder.encodeToString("9234567890123450".getBytes(StandardCharsets.UTF_8)), |
| encoder.encodeToString("9234567890123451".getBytes(StandardCharsets.UTF_8)), |
| encoder.encodeToString("9234567890123452".getBytes(StandardCharsets.UTF_8)), |
| encoder.encodeToString("9234567890123453".getBytes(StandardCharsets.UTF_8)), |
| encoder.encodeToString("9234567890123454".getBytes(StandardCharsets.UTF_8)), |
| encoder.encodeToString("9234567890123455".getBytes(StandardCharsets.UTF_8))}; |
| |
| private static final String NEW_KEY_LIST = new StringBuilder() |
| .append(COLUMN_MASTER_KEY_IDS[0]).append(": ").append(NEW_COLUMN_MASTER_KEYS[0]).append(", ") |
| .append(COLUMN_MASTER_KEY_IDS[1]).append(": ").append(NEW_COLUMN_MASTER_KEYS[1]).append(", ") |
| .append(COLUMN_MASTER_KEY_IDS[2]).append(": ").append(NEW_COLUMN_MASTER_KEYS[2]).append(", ") |
| .append(COLUMN_MASTER_KEY_IDS[3]).append(": ").append(NEW_COLUMN_MASTER_KEYS[3]).append(", ") |
| .append(COLUMN_MASTER_KEY_IDS[4]).append(": ").append(NEW_COLUMN_MASTER_KEYS[4]).append(", ") |
| .append(COLUMN_MASTER_KEY_IDS[5]).append(": ").append(NEW_COLUMN_MASTER_KEYS[5]).append(", ") |
| .append(FOOTER_MASTER_KEY_ID).append(": ").append(NEW_FOOTER_MASTER_KEY).toString(); |
| |
| private static final String COLUMN_KEY_MAPPING = new StringBuilder() |
| .append(COLUMN_MASTER_KEY_IDS[0]).append(": ").append(SingleRow.DOUBLE_FIELD_NAME).append("; ") |
| .append(COLUMN_MASTER_KEY_IDS[1]).append(": ").append(SingleRow.FLOAT_FIELD_NAME).append("; ") |
| .append(COLUMN_MASTER_KEY_IDS[2]).append(": ").append(SingleRow.BOOLEAN_FIELD_NAME).append("; ") |
| .append(COLUMN_MASTER_KEY_IDS[3]).append(": ").append(SingleRow.INT32_FIELD_NAME).append("; ") |
| .append(COLUMN_MASTER_KEY_IDS[4]).append(": ").append(SingleRow.BINARY_FIELD_NAME).append("; ") |
| .append(COLUMN_MASTER_KEY_IDS[5]).append(": ").append(SingleRow.FIXED_LENGTH_BINARY_FIELD_NAME) |
| .toString(); |
| |
| private static final int NUM_THREADS = 4; |
| private static final int WAIT_FOR_WRITE_TO_END_SECONDS = 5; |
| private static final int WAIT_FOR_READ_TO_END_SECONDS = 5; |
| |
| private static final boolean plaintextFilesAllowed = true; |
| |
| private static final int ROW_COUNT = 10000; |
| private static final List<SingleRow> DATA = Collections.unmodifiableList(SingleRow.generateRandomData(ROW_COUNT)); |
| |
| public enum EncryptionConfiguration { |
| ENCRYPT_COLUMNS_AND_FOOTER { |
| /** |
| * Encrypt two columns and the footer, with different keys. |
| */ |
| public Configuration getHadoopConfiguration(TestPropertiesDrivenEncryption test) { |
| Configuration conf = getCryptoProperties(test); |
| setEncryptionKeys(conf); |
| return conf; |
| } |
| }, |
| ENCRYPT_COLUMNS_PLAINTEXT_FOOTER { |
| /** |
| * Encrypt two columns, with different keys. |
| * Don't encrypt footer. |
| * (plaintext footer mode, readable by legacy readers) |
| */ |
| public Configuration getHadoopConfiguration(TestPropertiesDrivenEncryption test) { |
| Configuration conf = getCryptoProperties(test); |
| setEncryptionKeys(conf); |
| conf.setBoolean(PropertiesDrivenCryptoFactory.PLAINTEXT_FOOTER_PROPERTY_NAME, true); |
| return conf; |
| } |
| }, |
| ENCRYPT_COLUMNS_AND_FOOTER_CTR { |
| /** |
| * Encrypt two columns and the footer, with different keys. |
| * Use AES_GCM_CTR_V1 algorithm. |
| */ |
| public Configuration getHadoopConfiguration(TestPropertiesDrivenEncryption test) { |
| Configuration conf = getCryptoProperties(test); |
| setEncryptionKeys(conf); |
| conf.set(PropertiesDrivenCryptoFactory.ENCRYPTION_ALGORITHM_PROPERTY_NAME, |
| ParquetCipher.AES_GCM_CTR_V1.toString()); |
| return conf; |
| } |
| }, |
| NO_ENCRYPTION { |
| /** |
| * Do not encrypt anything |
| */ |
| public Configuration getHadoopConfiguration(TestPropertiesDrivenEncryption test) { |
| return null; |
| } |
| }; |
| |
| abstract public Configuration getHadoopConfiguration(TestPropertiesDrivenEncryption test); |
| } |
| |
| public enum DecryptionConfiguration { |
| DECRYPT_WITH_KEY_RETRIEVER { |
| /** |
| * Decrypt using key retriever callback that holds the keys |
| * of two encrypted columns and the footer key. |
| */ |
| public Configuration getHadoopConfiguration(TestPropertiesDrivenEncryption test) { |
| Configuration conf = getCryptoProperties(test); |
| return conf; |
| } |
| }, |
| NO_DECRYPTION { |
| /** |
| * Do not decrypt anything. |
| */ |
| public Configuration getHadoopConfiguration(TestPropertiesDrivenEncryption test) { |
| return null; |
| } |
| }; |
| |
| abstract public Configuration getHadoopConfiguration(TestPropertiesDrivenEncryption test); |
| } |
| |
| /** |
| * Get Hadoop configuration with configuration properties common to all encryption modes |
| */ |
| private static Configuration getCryptoProperties(TestPropertiesDrivenEncryption test) { |
| Configuration conf = new Configuration(); |
| conf.set(EncryptionPropertiesFactory.CRYPTO_FACTORY_CLASS_PROPERTY_NAME, |
| PropertiesDrivenCryptoFactory.class.getName()); |
| |
| if (test.isWrapLocally) { |
| conf.set(KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME, LocalWrapInMemoryKMS.class.getName()); |
| } else { |
| conf.set(KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME, InMemoryKMS.class.getName()); |
| } |
| conf.set(InMemoryKMS.KEY_LIST_PROPERTY_NAME, KEY_LIST); |
| conf.set(InMemoryKMS.NEW_KEY_LIST_PROPERTY_NAME, NEW_KEY_LIST); |
| |
| conf.setBoolean(KeyToolkit.KEY_MATERIAL_INTERNAL_PROPERTY_NAME, test.isKeyMaterialInternalStorage); |
| conf.setBoolean(KeyToolkit.DOUBLE_WRAPPING_PROPERTY_NAME, test.isDoubleWrapping); |
| return conf; |
| } |
| |
| /** |
| * Set configuration properties to encrypt columns and the footer with different keys |
| */ |
| private static void setEncryptionKeys(Configuration conf) { |
| conf.set(PropertiesDrivenCryptoFactory.COLUMN_KEYS_PROPERTY_NAME, COLUMN_KEY_MAPPING); |
| conf.set(PropertiesDrivenCryptoFactory.FOOTER_KEY_PROPERTY_NAME, FOOTER_MASTER_KEY_ID); |
| } |
| |
| |
| @Test |
| public void testWriteReadEncryptedParquetFiles() throws IOException { |
| Path rootPath = new Path(temporaryFolder.getRoot().getPath()); |
| LOG.info("======== testWriteReadEncryptedParquetFiles {} ========", rootPath.toString()); |
| LOG.info("Run: isKeyMaterialInternalStorage={} isDoubleWrapping={} isWrapLocally={}", |
| isKeyMaterialInternalStorage, isDoubleWrapping, isWrapLocally); |
| KeyToolkit.removeCacheEntriesForAllTokens(); |
| ExecutorService threadPool = Executors.newFixedThreadPool(NUM_THREADS); |
| try { |
| // Write using various encryption configurations. |
| testWriteEncryptedParquetFiles(rootPath, DATA, threadPool); |
| // Read using various decryption configurations. |
| testReadEncryptedParquetFiles(rootPath, DATA, threadPool); |
| } finally { |
| threadPool.shutdown(); |
| } |
| } |
| |
| |
| private void testWriteEncryptedParquetFiles(Path root, List<SingleRow> data, ExecutorService threadPool) throws IOException { |
| EncryptionConfiguration[] encryptionConfigurations = EncryptionConfiguration.values(); |
| for (EncryptionConfiguration encryptionConfiguration : encryptionConfigurations) { |
| Path encryptionConfigurationFolderPath = new Path(root, encryptionConfiguration.name()); |
| Configuration conf = new Configuration(); |
| FileSystem fs = encryptionConfigurationFolderPath.getFileSystem(conf); |
| if (fs.exists(encryptionConfigurationFolderPath)) { |
| fs.delete(encryptionConfigurationFolderPath, true); |
| } |
| fs.mkdirs(encryptionConfigurationFolderPath); |
| |
| KeyToolkit.removeCacheEntriesForAllTokens(); |
| CountDownLatch latch = new CountDownLatch(NUM_THREADS); |
| for (int i = 0; i < NUM_THREADS; ++i) { |
| final int threadNumber = i; |
| threadPool.execute(() -> { |
| writeEncryptedParquetFile(encryptionConfigurationFolderPath, data, encryptionConfiguration, threadNumber); |
| latch.countDown(); |
| }); |
| } |
| try { |
| latch.await(WAIT_FOR_WRITE_TO_END_SECONDS, TimeUnit.SECONDS); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| |
| private void writeEncryptedParquetFile(Path root, List<SingleRow> data, EncryptionConfiguration encryptionConfiguration, |
| int threadNumber) { |
| MessageType schema = SingleRow.getSchema(); |
| SimpleGroupFactory f = new SimpleGroupFactory(schema); |
| |
| int pageSize = data.size() / 10; // Ensure that several pages will be created |
| int rowGroupSize = pageSize * 6 * 5; // Ensure that there are more row-groups created |
| |
| Path file = new Path(root, getFileName(root, encryptionConfiguration, threadNumber)); |
| LOG.info("\nWrite " + file.toString()); |
| Configuration conf = encryptionConfiguration.getHadoopConfiguration(this); |
| FileEncryptionProperties fileEncryptionProperties = null; |
| try { |
| if (null == conf) { |
| conf = new Configuration(); |
| } else { |
| EncryptionPropertiesFactory cryptoFactory = EncryptionPropertiesFactory.loadFactory(conf); |
| fileEncryptionProperties = cryptoFactory.getFileEncryptionProperties(conf, file, null); |
| } |
| } catch (Exception e) { |
| addErrorToErrorCollectorAndLog("Failed writing " + file.toString(), e, |
| encryptionConfiguration, null); |
| return; |
| } |
| try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(file) |
| .withConf(conf) |
| .withWriteMode(OVERWRITE) |
| .withType(schema) |
| .withPageSize(pageSize) |
| .withRowGroupSize(rowGroupSize) |
| .withEncryption(fileEncryptionProperties) |
| .build()) { |
| |
| for (SingleRow singleRow : data) { |
| writer.write( |
| f.newGroup() |
| .append(SingleRow.BOOLEAN_FIELD_NAME, singleRow.boolean_field) |
| .append(SingleRow.INT32_FIELD_NAME, singleRow.int32_field) |
| .append(SingleRow.FLOAT_FIELD_NAME, singleRow.float_field) |
| .append(SingleRow.DOUBLE_FIELD_NAME, singleRow.double_field) |
| .append(SingleRow.BINARY_FIELD_NAME, Binary.fromConstantByteArray(singleRow.ba_field)) |
| .append(SingleRow.FIXED_LENGTH_BINARY_FIELD_NAME, Binary.fromConstantByteArray(singleRow.flba_field)) |
| .append(SingleRow.PLAINTEXT_INT32_FIELD_NAME, singleRow.plaintext_int32_field)); |
| |
| } |
| } catch (Exception e) { |
| addErrorToErrorCollectorAndLog("Failed writing " + file.toString(), e, |
| encryptionConfiguration, null); |
| } |
| } |
| |
| private Path getFileName(Path root, EncryptionConfiguration encryptionConfiguration, int threadNumber) { |
| String suffix = (EncryptionConfiguration.NO_ENCRYPTION == encryptionConfiguration) ? ".parquet" : ".parquet.encrypted"; |
| return new Path(root, encryptionConfiguration.toString() + "_" + threadNumber + suffix); |
| } |
| |
| private void testReadEncryptedParquetFiles(Path root, List<SingleRow> data, ExecutorService threadPool) throws IOException { |
| readFilesMultithreaded(root, data, threadPool, false/*keysRotated*/); |
| |
| if (isWrapLocally) { |
| return; // key rotation is not supported with local key wrapping |
| } |
| |
| LOG.info("--> Start master key rotation"); |
| Configuration hadoopConfigForRotation = |
| EncryptionConfiguration.ENCRYPT_COLUMNS_AND_FOOTER.getHadoopConfiguration(this); |
| hadoopConfigForRotation.set(InMemoryKMS.NEW_KEY_LIST_PROPERTY_NAME, NEW_KEY_LIST); |
| InMemoryKMS.startKeyRotation(hadoopConfigForRotation); |
| |
| EncryptionConfiguration[] encryptionConfigurations = EncryptionConfiguration.values(); |
| for (EncryptionConfiguration encryptionConfiguration : encryptionConfigurations) { |
| if (EncryptionConfiguration.NO_ENCRYPTION == encryptionConfiguration) { |
| continue; // no rotation of plaintext files |
| } |
| Path encryptionConfigurationFolderPath = new Path(root, encryptionConfiguration.name()); |
| try { |
| LOG.info("Rotate master keys in folder: " + encryptionConfigurationFolderPath.toString()); |
| KeyToolkit.rotateMasterKeys(encryptionConfigurationFolderPath.toString(), hadoopConfigForRotation); |
| } catch (UnsupportedOperationException e) { |
| if (isKeyMaterialInternalStorage || isWrapLocally) { |
| LOG.info("Key material file not found, as expected"); |
| } else { |
| errorCollector.addError(e); |
| } |
| return; // No use in continuing reading if rotation wasn't successful |
| } catch (Exception e) { |
| errorCollector.addError(e); |
| return; // No use in continuing reading if rotation wasn't successful |
| } |
| } |
| |
| InMemoryKMS.finishKeyRotation(); |
| LOG.info("--> Finish master key rotation"); |
| |
| LOG.info("--> Read files again with new keys"); |
| readFilesMultithreaded(root, data, threadPool, true /*keysRotated*/); |
| } |
| |
| private void readFilesMultithreaded(Path root, List<SingleRow> data, ExecutorService threadPool, boolean keysRotated) { |
| DecryptionConfiguration[] decryptionConfigurations = DecryptionConfiguration.values(); |
| for (DecryptionConfiguration decryptionConfiguration : decryptionConfigurations) { |
| LOG.info("\n\n"); |
| LOG.info("==> Decryption configuration {}\n", decryptionConfiguration); |
| Configuration hadoopConfig = decryptionConfiguration.getHadoopConfiguration(this); |
| if (null != hadoopConfig) { |
| KeyToolkit.removeCacheEntriesForAllTokens(); |
| } |
| |
| EncryptionConfiguration[] encryptionConfigurations = EncryptionConfiguration.values(); |
| for (EncryptionConfiguration encryptionConfiguration : encryptionConfigurations) { |
| Path encryptionConfigurationFolderPath = new Path(root, encryptionConfiguration.name()); |
| |
| CountDownLatch latch = new CountDownLatch(NUM_THREADS); |
| for (int i = 0; i < NUM_THREADS; ++i) { |
| final int threadNumber = i; |
| threadPool.execute(() -> { |
| Path file = getFileName(encryptionConfigurationFolderPath, encryptionConfiguration, threadNumber); |
| LOG.info("--> Read file {} {}", file.toString(), encryptionConfiguration); |
| readFileAndCheckResult(hadoopConfig, encryptionConfiguration, decryptionConfiguration, |
| data, file, keysRotated); |
| |
| latch.countDown(); |
| }); |
| } |
| try { |
| latch.await(WAIT_FOR_READ_TO_END_SECONDS, TimeUnit.SECONDS); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| } |
| |
| private void readFileAndCheckResult(Configuration hadoopConfig, EncryptionConfiguration encryptionConfiguration, |
| DecryptionConfiguration decryptionConfiguration, |
| List<SingleRow> data, Path file, boolean keysRotated) { |
| FileDecryptionProperties fileDecryptionProperties = null; |
| if (null == hadoopConfig) { |
| hadoopConfig = new Configuration(); |
| } else { |
| DecryptionPropertiesFactory cryptoFactory = DecryptionPropertiesFactory.loadFactory(hadoopConfig); |
| fileDecryptionProperties = cryptoFactory.getFileDecryptionProperties(hadoopConfig, file); |
| } |
| |
| // Set schema to only point to the non-encrypted columns |
| if ((decryptionConfiguration == DecryptionConfiguration.NO_DECRYPTION) && |
| (encryptionConfiguration == EncryptionConfiguration.ENCRYPT_COLUMNS_PLAINTEXT_FOOTER)) { |
| hadoopConfig.set("parquet.read.schema", Types.buildMessage() |
| .optional(INT32).named(SingleRow.PLAINTEXT_INT32_FIELD_NAME) |
| .named("FormatTestObject").toString()); |
| } |
| if ((encryptionConfiguration != EncryptionConfiguration.NO_ENCRYPTION) && |
| (encryptionConfiguration != EncryptionConfiguration.ENCRYPT_COLUMNS_PLAINTEXT_FOOTER)) { |
| byte[] magic = new byte[MAGIC.length]; |
| try (InputStream is = new FileInputStream(file.toString())) { |
| if (is.read(magic) != magic.length) { |
| throw new RuntimeException("ERROR"); |
| } |
| if (!Arrays.equals(EFMAGIC, magic)) { |
| addErrorToErrorCollectorAndLog("File doesn't start with " + EF_MAGIC_STR, encryptionConfiguration, decryptionConfiguration); |
| } |
| } catch (IOException e) { |
| addErrorToErrorCollectorAndLog("Failed to read magic string at the beginning of file", e, |
| encryptionConfiguration, decryptionConfiguration); |
| } |
| } |
| |
| if (keysRotated && (null != hadoopConfig.get(InMemoryKMS.KEY_LIST_PROPERTY_NAME))) { |
| hadoopConfig.set(InMemoryKMS.KEY_LIST_PROPERTY_NAME, NEW_KEY_LIST); |
| } |
| |
| int rowNum = 0; |
| try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file) |
| .withConf(hadoopConfig) |
| .withDecryption(fileDecryptionProperties) |
| .build()) { |
| for (Group group = reader.read(); group != null; group = reader.read()) { |
| SingleRow rowExpected = data.get(rowNum++); |
| |
| // plaintext columns |
| if (rowExpected.plaintext_int32_field != group.getInteger(SingleRow.PLAINTEXT_INT32_FIELD_NAME, 0)) { |
| addErrorToErrorCollectorAndLog("Wrong int", encryptionConfiguration, decryptionConfiguration); |
| } |
| // encrypted columns |
| if (decryptionConfiguration != DecryptionConfiguration.NO_DECRYPTION) { |
| if (rowExpected.boolean_field != group.getBoolean(SingleRow.BOOLEAN_FIELD_NAME, 0)) { |
| addErrorToErrorCollectorAndLog("Wrong bool", encryptionConfiguration, decryptionConfiguration); |
| } |
| if (rowExpected.int32_field != group.getInteger(SingleRow.INT32_FIELD_NAME, 0)) { |
| addErrorToErrorCollectorAndLog("Wrong int", encryptionConfiguration, decryptionConfiguration); |
| } |
| if (rowExpected.float_field != group.getFloat(SingleRow.FLOAT_FIELD_NAME, 0)) { |
| addErrorToErrorCollectorAndLog("Wrong float", encryptionConfiguration, decryptionConfiguration); |
| } |
| if (rowExpected.double_field != group.getDouble(SingleRow.DOUBLE_FIELD_NAME, 0)) { |
| addErrorToErrorCollectorAndLog("Wrong double", encryptionConfiguration, decryptionConfiguration); |
| } |
| if ((null != rowExpected.ba_field) && |
| !Arrays.equals(rowExpected.ba_field, group.getBinary(SingleRow.BINARY_FIELD_NAME, 0).getBytes())) { |
| addErrorToErrorCollectorAndLog("Wrong byte array", encryptionConfiguration, decryptionConfiguration); |
| } |
| if (!Arrays.equals(rowExpected.flba_field, |
| group.getBinary(SingleRow.FIXED_LENGTH_BINARY_FIELD_NAME, 0).getBytes())) { |
| addErrorToErrorCollectorAndLog("Wrong fixed-length byte array", |
| encryptionConfiguration, decryptionConfiguration); |
| } |
| } |
| } |
| } catch (Exception e) { |
| checkResult(file.getName(), decryptionConfiguration, e); |
| } |
| hadoopConfig.unset("parquet.read.schema"); |
| } |
| |
| /** |
| * Check that the decryption result is as expected. |
| */ |
| private void checkResult(String file, DecryptionConfiguration decryptionConfiguration, Exception exception) { |
| String errorMessage = exception.getMessage(); |
| String exceptionMsg = (null == errorMessage ? exception.getClass().getName() : errorMessage); |
| // Extract encryptionConfigurationNumber from the parquet file name. |
| EncryptionConfiguration encryptionConfiguration = getEncryptionConfigurationFromFilename(file); |
| |
| if (!plaintextFilesAllowed) { |
| // Encryption_configuration null encryptor, so parquet is plaintext. |
| // An exception is expected to be thrown if the file is being decrypted. |
| if (encryptionConfiguration == EncryptionConfiguration.NO_ENCRYPTION) { |
| if (decryptionConfiguration == DecryptionConfiguration.DECRYPT_WITH_KEY_RETRIEVER) { |
| if (!exceptionMsg.endsWith("Applying decryptor on plaintext file")) { |
| addErrorToErrorCollectorAndLog("Expecting exception Applying decryptor on plaintext file", |
| exceptionMsg, encryptionConfiguration, decryptionConfiguration); |
| } else { |
| LOG.info("Exception as expected: " + exceptionMsg); |
| } |
| return; |
| } |
| } |
| } |
| // Decryption configuration is null, so only plaintext file can be read. An exception is expected to |
| // be thrown if the file is encrypted. |
| if (decryptionConfiguration == DecryptionConfiguration.NO_DECRYPTION) { |
| if ((encryptionConfiguration != EncryptionConfiguration.NO_ENCRYPTION && |
| encryptionConfiguration != EncryptionConfiguration.ENCRYPT_COLUMNS_PLAINTEXT_FOOTER)) { |
| if (!exceptionMsg.endsWith("No encryption key list") && !exceptionMsg.endsWith("No keys available")) { |
| addErrorToErrorCollectorAndLog("Expecting No keys available exception", exceptionMsg, |
| encryptionConfiguration, decryptionConfiguration); |
| } else { |
| LOG.info("Exception as expected: " + exceptionMsg); |
| } |
| return; |
| } |
| } |
| exception.printStackTrace(); |
| addErrorToErrorCollectorAndLog("Didn't expect an exception", exceptionMsg, |
| encryptionConfiguration, decryptionConfiguration); |
| } |
| |
| private EncryptionConfiguration getEncryptionConfigurationFromFilename(String file) { |
| if (!file.endsWith(".parquet.encrypted")) { |
| return null; |
| } |
| String fileNamePrefix = file.replaceFirst("(.*)_[0-9]+.parquet.encrypted", "$1");; |
| try { |
| EncryptionConfiguration encryptionConfiguration = EncryptionConfiguration.valueOf(fileNamePrefix.toUpperCase()); |
| return encryptionConfiguration; |
| } catch (IllegalArgumentException e) { |
| LOG.error("File name doesn't match any known encryption configuration: " + file); |
| synchronized (errorCollector) { |
| errorCollector.addError(e); |
| } |
| return null; |
| } |
| } |
| |
| private void addErrorToErrorCollectorAndLog(String errorMessage, String exceptionMessage, EncryptionConfiguration encryptionConfiguration, |
| DecryptionConfiguration decryptionConfiguration) { |
| String fullErrorMessage = String.format("%s - %s Error: %s, but got [%s]", |
| encryptionConfiguration, decryptionConfiguration, errorMessage, exceptionMessage); |
| |
| synchronized (errorCollector) { |
| errorCollector.addError(new Throwable(fullErrorMessage)); |
| } |
| LOG.error(fullErrorMessage); |
| } |
| |
| private void addErrorToErrorCollectorAndLog(String errorMessage, EncryptionConfiguration encryptionConfiguration, |
| DecryptionConfiguration decryptionConfiguration) { |
| String fullErrorMessage = String.format("%s - %s Error: %s", |
| encryptionConfiguration, decryptionConfiguration, errorMessage); |
| |
| synchronized (errorCollector) { |
| errorCollector.addError(new Throwable(fullErrorMessage)); |
| } |
| LOG.error(fullErrorMessage); |
| } |
| |
| private void addErrorToErrorCollectorAndLog(String errorMessage, Throwable exception, |
| EncryptionConfiguration encryptionConfiguration, |
| DecryptionConfiguration decryptionConfiguration) { |
| String errorMessageWithExceptionDetails = String.format("%s %s %s", errorMessage, exception.getClass().getName(), |
| exception.getMessage()); |
| addErrorToErrorCollectorAndLog(errorMessageWithExceptionDetails, |
| encryptionConfiguration, decryptionConfiguration); |
| exception.printStackTrace(); |
| } |
| } |