PARQUET-1396: Example of using EncryptionPropertiesFactory and DecryptionPropertiesFactory (#808)
* Parquet-1396: Example of using EncryptionPropertiesFactory and DecryptionPropertiesFactory
* Address feedbacks
* Remove ExtType and add metadata to Type directly
* Use Configuration to pass the setting
* Address feedback
* Replace file.toString() with file.getPath()
* Address feedback
* fix build error
* Address more feedbacks
diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/NestedNullWritingBenchmarks.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/NestedNullWritingBenchmarks.java
index 324775b..f5613a2 100644
--- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/NestedNullWritingBenchmarks.java
+++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/NestedNullWritingBenchmarks.java
@@ -118,6 +118,11 @@
}
};
}
+
+ @Override
+ public String getPath() {
+ throw new UnsupportedOperationException();
+ }
};
private static class ValueGenerator {
diff --git a/parquet-common/src/main/java/org/apache/parquet/io/OutputFile.java b/parquet-common/src/main/java/org/apache/parquet/io/OutputFile.java
index 2d6de44..e1558ce 100644
--- a/parquet-common/src/main/java/org/apache/parquet/io/OutputFile.java
+++ b/parquet-common/src/main/java/org/apache/parquet/io/OutputFile.java
@@ -31,4 +31,5 @@
long defaultBlockSize();
+ String getPath();
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index 41fcfb6..6be27e4 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -544,8 +544,8 @@
public synchronized static MemoryManager getMemoryManager() {
return memoryManager;
}
-
- private static FileEncryptionProperties createEncryptionProperties(Configuration fileHadoopConfig, Path tempFilePath,
+
+ public static FileEncryptionProperties createEncryptionProperties(Configuration fileHadoopConfig, Path tempFilePath,
WriteContext fileWriteContext) {
EncryptionPropertiesFactory cryptoFactory = EncryptionPropertiesFactory.loadFactory(fileHadoopConfig);
if (null == cryptoFactory) {
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index ecc12de..c571afd 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -279,6 +279,11 @@
WriteSupport.WriteContext writeContext = writeSupport.init(conf);
MessageType schema = writeContext.getSchema();
+ // encryptionProperties could be built from the implementation of EncryptionPropertiesFactory when it is attached.
+ if (encryptionProperties == null) {
+ encryptionProperties = ParquetOutputFormat.createEncryptionProperties(conf, new Path(file.getPath()), writeContext);
+ }
+
ParquetFileWriter fileWriter = new ParquetFileWriter(
file, schema, mode, rowGroupSize, maxPaddingSize,
encodingProps.getColumnIndexTruncateLength(), encodingProps.getStatisticsTruncateLength(),
@@ -640,4 +645,4 @@
}
}
}
-}
\ No newline at end of file
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopOutputFile.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopOutputFile.java
index 4740fd4..30ac50e 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopOutputFile.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopOutputFile.java
@@ -94,6 +94,11 @@
}
@Override
+ public String getPath() {
+ return toString();
+ }
+
+ @Override
public String toString() {
return path.toString();
}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaControlEncryptionTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaControlEncryptionTest.java
new file mode 100644
index 0000000..17fda97
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaControlEncryptionTest.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.crypto.propertiesfactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.crypto.EncryptionPropertiesFactory;
+import org.apache.parquet.crypto.ParquetCipher;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroup;
+import org.apache.parquet.format.EncryptionAlgorithm;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
+import static org.apache.parquet.schema.Type.Repetition.REPEATED;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class SchemaControlEncryptionTest {
+
+ private final static Log LOG = LogFactory.getLog(SchemaControlEncryptionTest.class);
+ private final static int numRecord = 1000;
+ private Random rnd = new Random(5);
+
+ // In the test We use a map to tell WriteSupport which columns to be encrypted with what key. In real use cases, people
+ // can find whatever easy way to do so basing on how do they get these information, for example people can choose to
+ // store in HMS, or other metastore.
+ private Map<String, Map<String, Object>> cryptoMetadata = new HashMap<>();
+ private Map<String, Object[]> testData = new HashMap<>();
+
+ @Before
+ public void generateTestData() {
+ String[] names = new String[numRecord];
+ Long[] ages = new Long[numRecord];
+ String[] linkedInWebs = new String[numRecord];
+ String[] twitterWebs = new String[numRecord];
+ for (int i = 0; i < numRecord; i++) {
+ names[i] = getString();
+ ages[i] = getLong();
+ linkedInWebs[i] = getString();
+ twitterWebs[i] = getString();
+ }
+
+ testData.put("Name", names);
+ testData.put("Age", ages);
+ testData.put("LinkedIn", linkedInWebs);
+ testData.put("Twitter", twitterWebs);
+ }
+
+ @Test
+ public void testEncryptionDefault() throws Exception {
+ Configuration conf = new Configuration();
+ runTest(conf);
+ }
+
+ @Test
+ public void testEncryptionGcm() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(SchemaCryptoPropertiesFactory.CONF_ENCRYPTION_ALGORITHM, ParquetCipher.AES_GCM_V1.toString());
+ runTest(conf);
+ }
+
+ @Test
+ public void testEncryptionGcmCtr() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(SchemaCryptoPropertiesFactory.CONF_ENCRYPTION_ALGORITHM, ParquetCipher.AES_GCM_CTR_V1.toString());
+ runTest(conf);
+ }
+
+ @Test
+ public void testEncryptionWithFooter() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setBoolean(SchemaCryptoPropertiesFactory.CONF_ENCRYPTION_FOOTER, true);
+ runTest(conf);
+ }
+
+ private void runTest(Configuration conf ) throws Exception {
+ conf.set(EncryptionPropertiesFactory.CRYPTO_FACTORY_CLASS_PROPERTY_NAME,
+ SchemaCryptoPropertiesFactory.class.getName());
+ String file = createTempFile("test");
+ markEncryptColumns();
+ encryptParquetFile(file, conf);
+ decryptParquetFileAndValid(file, conf);
+ }
+
+ private void markEncryptColumns() {
+ Map<String, Object> ageMetadata = new HashMap<>();
+ ageMetadata.put("columnKeyMetaData", "age_key_id");
+ cryptoMetadata.put("Age", ageMetadata);
+
+ Map<String, Object> linkMetadata = new HashMap<>();
+ linkMetadata.put("columnKeyMetaData", "link_key_id");
+ cryptoMetadata.put("LinkedIn", linkMetadata);
+ }
+
+ private String encryptParquetFile(String file, Configuration conf) throws IOException {
+ MessageType schema = new MessageType("schema",
+ new PrimitiveType(REQUIRED, BINARY, "Name"),
+ new PrimitiveType(REQUIRED, INT64, "Age"),
+ new GroupType(OPTIONAL, "WebLinks",
+ new PrimitiveType(REPEATED, BINARY, "LinkedIn"),
+ new PrimitiveType(REPEATED, BINARY, "Twitter")));
+
+ conf.set(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, schema.toString());
+ Path path = new Path(file);
+ Builder builder = new Builder(path);
+ builder.withConf(conf);
+
+ try (ParquetWriter writer = builder.build()) {
+ for (int i = 0; i < 1000; i++) {
+ SimpleGroup g = new SimpleGroup(schema);
+ g.add("Name", (String)testData.get("Name")[i]);
+ g.add("Age", (Long)testData.get("Age")[i]);
+ Group links = g.addGroup("WebLinks");
+ links.add(0, (String)testData.get("LinkedIn")[i]);
+ links.add(1, (String)testData.get("Twitter")[i]);
+ writer.write(g);
+ }
+ }
+
+ return file;
+ }
+
+ private void decryptParquetFileAndValid(String file, Configuration conf) throws IOException {
+ ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), new Path(file)).withConf(conf).build();
+ for (int i = 0; i < numRecord; i++) {
+ Group group = reader.read();
+ assertEquals(testData.get("Name")[i], group.getBinary("Name", 0).toStringUsingUTF8());
+ assertEquals(testData.get("Age")[i], group.getLong("Age", 0));
+
+ Group subGroup = group.getGroup("WebLinks", 0);
+ assertArrayEquals(subGroup.getBinary("LinkedIn", 0).getBytes(), ((String)testData.get("LinkedIn")[i]).getBytes());
+ assertArrayEquals(subGroup.getBinary("Twitter", 0).getBytes(), ((String)testData.get("Twitter")[i]).getBytes());
+ }
+ reader.close();
+ }
+
+ private static String createTempFile(String prefix) {
+ try {
+ return Files.createTempDirectory(prefix).toAbsolutePath().toString() + "/test.parquet";
+ } catch (IOException e) {
+ throw new AssertionError("Unable to create temporary file", e);
+ }
+ }
+
+ private static long getLong() {
+ return ThreadLocalRandom.current().nextLong(1000);
+ }
+
+ private String getString() {
+ char[] chars = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'x', 'z', 'y'};
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < 100; i++) {
+ sb.append(chars[rnd.nextInt(10)]);
+ }
+ return sb.toString();
+ }
+
+ private class CryptoGroupWriteSupport extends GroupWriteSupport {
+
+ public CryptoGroupWriteSupport() {
+ super();
+ }
+
+ @Override
+ public WriteContext init(Configuration conf) {
+ WriteContext writeContext = super.init(conf);
+ MessageType schema = writeContext.getSchema();
+ List<ColumnDescriptor> columns = schema.getColumns();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("There are " + columns.size() + " columns");
+ }
+
+ for (ColumnDescriptor column : columns) {
+ setMetadata(column, conf);
+ }
+
+ return writeContext;
+ }
+
+ private void setMetadata(ColumnDescriptor column, Configuration conf) {
+ String columnShortName = column.getPath()[column.getPath().length - 1];
+ if (cryptoMetadata.containsKey(columnShortName) &&
+ cryptoMetadata.get(columnShortName).get("columnKeyMetaData") != null) {
+ String columnKey = String.join(".", column.getPath());
+ conf.set(SchemaCryptoPropertiesFactory.PATH_NAME_PREFIX + columnKey,
+ cryptoMetadata.get(columnShortName).get("columnKeyMetaData").toString());
+ }
+ }
+ }
+
+ public class Builder extends ParquetWriter.Builder<Group, Builder> {
+
+ private Builder(Path file) {
+ super(file);
+ }
+
+ @Override
+ protected Builder self() {
+ return this;
+ }
+
+ @Override
+ protected WriteSupport<Group> getWriteSupport(Configuration conf) {
+ return new CryptoGroupWriteSupport();
+ }
+ }
+}
+
+
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaCryptoPropertiesFactory.java b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaCryptoPropertiesFactory.java
new file mode 100644
index 0000000..446fc6d
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaCryptoPropertiesFactory.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.crypto.propertiesfactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.DecryptionKeyRetrieverMock;
+import org.apache.parquet.crypto.DecryptionPropertiesFactory;
+import org.apache.parquet.crypto.EncryptionPropertiesFactory;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.ParquetCipher;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.hadoop.api.WriteSupport.WriteContext;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SchemaCryptoPropertiesFactory implements EncryptionPropertiesFactory, DecryptionPropertiesFactory {
+
+ public static final String PATH_NAME_PREFIX = "column_encryption_1178_";
+
+ private static Logger log = LoggerFactory.getLogger(SchemaCryptoPropertiesFactory.class);
+
+ public static final String CONF_ENCRYPTION_ALGORITHM = "parquet.encryption.algorithm";
+ public static final String CONF_ENCRYPTION_FOOTER = "parquet.encrypt.footer";
+ private static final byte[] FOOTER_KEY = {0x01, 0x02, 0x03, 0x4, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a,
+ 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10};
+ private static final byte[] FOOTER_KEY_METADATA = "footkey".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] COL_KEY = {0x02, 0x03, 0x4, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b,
+ 0x0c, 0x0d, 0x0e, 0x0f, 0x10, 0x11};
+ private static final byte[] COL_KEY_METADATA = "col".getBytes(StandardCharsets.UTF_8);
+
+ @Override
+ public FileEncryptionProperties getFileEncryptionProperties(Configuration conf, Path tempFilePath,
+ WriteContext fileWriteContext) throws ParquetCryptoRuntimeException {
+ MessageType schema = fileWriteContext.getSchema();
+ List<String[]> paths = schema.getPaths();
+ if (paths == null || paths.isEmpty()) {
+ throw new ParquetCryptoRuntimeException("Null or empty fields is found");
+ }
+
+ Map<ColumnPath, ColumnEncryptionProperties> columnPropertyMap = new HashMap<>();
+
+ for (String[] path : paths) {
+ getColumnEncryptionProperties(path, columnPropertyMap, conf);
+ }
+
+ if (columnPropertyMap.size() == 0) {
+ log.debug("No column is encrypted. Returning null so that Parquet can skip. Empty properties will cause Parquet exception");
+ return null;
+ }
+
+ /**
+ * Why we still need footerKeyMetadata even withEncryptedFooter as false? According to the
+ * 'Plaintext Footer' section of
+ * https://github.com/apache/parquet-format/blob/encryption/Encryption.md, the plaintext footer
+ * is signed in order to prevent tampering with the FileMetaData contents. So footerKeyMetadata
+ * is always needed. This signature will be verified if parquet-mr code is with parquet-1178.
+ * Otherwise, it will be ignored.
+ */
+ boolean shouldEncryptFooter = getEncryptFooter(conf);
+ FileEncryptionProperties.Builder encryptionPropertiesBuilder =
+ FileEncryptionProperties.builder(FOOTER_KEY)
+ .withFooterKeyMetadata(FOOTER_KEY_METADATA)
+ .withAlgorithm(getParquetCipherOrDefault(conf))
+ .withEncryptedColumns(columnPropertyMap);
+ if (!shouldEncryptFooter) {
+ encryptionPropertiesBuilder = encryptionPropertiesBuilder.withPlaintextFooter();
+ }
+ FileEncryptionProperties encryptionProperties = encryptionPropertiesBuilder.build();
+ log.info(
+ "FileEncryptionProperties is built with, algorithm:{}, footerEncrypted:{}",
+ encryptionProperties.getAlgorithm(),
+ encryptionProperties.encryptedFooter());
+ return encryptionProperties;
+ }
+
+ private ParquetCipher getParquetCipherOrDefault(Configuration conf) {
+ String algorithm = conf.get(CONF_ENCRYPTION_ALGORITHM, "AES_GCM_CTR_V1");
+ log.debug("Encryption algorithm is {}", algorithm);
+ return ParquetCipher.valueOf(algorithm.toUpperCase());
+ }
+
+ private boolean getEncryptFooter(Configuration conf) {
+ boolean encryptFooter = conf.getBoolean(CONF_ENCRYPTION_FOOTER, false);
+ log.debug("Encrypt Footer: {}", encryptFooter);
+ return encryptFooter;
+ }
+
+ private void getColumnEncryptionProperties(String[] path, Map<ColumnPath, ColumnEncryptionProperties> columnPropertyMap,
+ Configuration conf) throws ParquetCryptoRuntimeException {
+ String pathName = String.join(".", path);
+ String columnKeyName = conf.get(PATH_NAME_PREFIX + pathName, null);
+ if (columnKeyName != null) {
+ ColumnPath columnPath = ColumnPath.get(path);
+ ColumnEncryptionProperties colEncProp = ColumnEncryptionProperties.builder(columnPath)
+ .withKey(COL_KEY)
+ .withKeyMetaData(COL_KEY_METADATA)
+ .build();
+ columnPropertyMap.put(columnPath, colEncProp);
+ }
+ }
+
+ @Override
+ public FileDecryptionProperties getFileDecryptionProperties(Configuration hadoopConfig, Path filePath)
+ throws ParquetCryptoRuntimeException {
+ DecryptionKeyRetrieverMock keyRetriever = new DecryptionKeyRetrieverMock();
+ keyRetriever.putKey("footkey", FOOTER_KEY);
+ keyRetriever.putKey("col", COL_KEY);
+ return FileDecryptionProperties.builder().withPlaintextFilesAllowed().withKeyRetriever(keyRetriever).build();
+ }
+}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
index 0569c42..22dfc43 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
@@ -114,6 +114,11 @@
public long defaultBlockSize() {
return file.defaultBlockSize();
}
+
+ @Override
+ public String getPath() {
+ return file.getPath();
+ }
}
private int pageSize = 1024;