PARQUET-1396: Example of using EncryptionPropertiesFactory and DecryptionPropertiesFactory (#808)

* Parquet-1396: Example of using EncryptionPropertiesFactory and DecryptionPropertiesFactory

* Address feedbacks

* Remove ExtType and add metadata to Type directly

* Use Configuration to pass the setting

* Address feedback

* Replace file.toString() with file.getPath()

* Address feedback

* fix build error

* Address more feedbacks
diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/NestedNullWritingBenchmarks.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/NestedNullWritingBenchmarks.java
index 324775b..f5613a2 100644
--- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/NestedNullWritingBenchmarks.java
+++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/NestedNullWritingBenchmarks.java
@@ -118,6 +118,11 @@
         }
       };
     }
+
+    @Override
+    public String getPath() {
+      throw new UnsupportedOperationException();
+    }
   };
 
   private static class ValueGenerator {
diff --git a/parquet-common/src/main/java/org/apache/parquet/io/OutputFile.java b/parquet-common/src/main/java/org/apache/parquet/io/OutputFile.java
index 2d6de44..e1558ce 100644
--- a/parquet-common/src/main/java/org/apache/parquet/io/OutputFile.java
+++ b/parquet-common/src/main/java/org/apache/parquet/io/OutputFile.java
@@ -31,4 +31,5 @@
 
   long defaultBlockSize();
 
+  String getPath();
 }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index 41fcfb6..6be27e4 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -544,8 +544,8 @@
   public synchronized static MemoryManager getMemoryManager() {
     return memoryManager;
   }
-  
-  private static FileEncryptionProperties createEncryptionProperties(Configuration fileHadoopConfig, Path tempFilePath, 
+
+  public static FileEncryptionProperties createEncryptionProperties(Configuration fileHadoopConfig, Path tempFilePath,
       WriteContext fileWriteContext) {
     EncryptionPropertiesFactory cryptoFactory = EncryptionPropertiesFactory.loadFactory(fileHadoopConfig);
     if (null == cryptoFactory) {
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index ecc12de..c571afd 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -279,6 +279,11 @@
     WriteSupport.WriteContext writeContext = writeSupport.init(conf);
     MessageType schema = writeContext.getSchema();
 
+    // encryptionProperties could be built from the implementation of EncryptionPropertiesFactory when it is attached.
+    if (encryptionProperties == null) {
+      encryptionProperties = ParquetOutputFormat.createEncryptionProperties(conf, new Path(file.getPath()), writeContext);
+    }
+
     ParquetFileWriter fileWriter = new ParquetFileWriter(
       file, schema, mode, rowGroupSize, maxPaddingSize,
       encodingProps.getColumnIndexTruncateLength(), encodingProps.getStatisticsTruncateLength(),
@@ -640,4 +645,4 @@
       }
     }
   }
-}
\ No newline at end of file
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopOutputFile.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopOutputFile.java
index 4740fd4..30ac50e 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopOutputFile.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopOutputFile.java
@@ -94,6 +94,11 @@
   }
 
   @Override
+  public String getPath() {
+    return toString();
+  }
+
+  @Override
   public String toString() {
     return path.toString();
   }
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaControlEncryptionTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaControlEncryptionTest.java
new file mode 100644
index 0000000..17fda97
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaControlEncryptionTest.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.crypto.propertiesfactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.crypto.EncryptionPropertiesFactory;
+import org.apache.parquet.crypto.ParquetCipher;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroup;
+import org.apache.parquet.format.EncryptionAlgorithm;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
+import static org.apache.parquet.schema.Type.Repetition.REPEATED;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class SchemaControlEncryptionTest {
+
+  private final static Log LOG = LogFactory.getLog(SchemaControlEncryptionTest.class);
+  private final static int numRecord = 1000;
+  private Random rnd = new Random(5);
+  
+  // In the test We use a map to tell WriteSupport which columns to be encrypted with what key. In real use cases, people
+  // can find whatever easy way to do so basing on how do they get these information, for example people can choose to 
+  // store in HMS, or other metastore. 
+  private Map<String, Map<String, Object>> cryptoMetadata = new HashMap<>();
+  private Map<String, Object[]> testData = new HashMap<>();
+
+  @Before
+  public void generateTestData() {
+    String[] names = new String[numRecord];
+    Long[] ages = new Long[numRecord];
+    String[] linkedInWebs = new String[numRecord];
+    String[] twitterWebs = new String[numRecord];
+    for (int i = 0; i < numRecord; i++) {
+      names[i] = getString();
+      ages[i] = getLong();
+      linkedInWebs[i] = getString();
+      twitterWebs[i] = getString();
+    }
+
+    testData.put("Name", names);
+    testData.put("Age", ages);
+    testData.put("LinkedIn", linkedInWebs);
+    testData.put("Twitter", twitterWebs);
+  }
+
+  @Test
+  public void testEncryptionDefault() throws Exception {
+    Configuration conf = new Configuration();
+    runTest(conf);
+  }
+
+  @Test
+  public void testEncryptionGcm() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(SchemaCryptoPropertiesFactory.CONF_ENCRYPTION_ALGORITHM, ParquetCipher.AES_GCM_V1.toString());
+    runTest(conf);
+  }
+
+  @Test
+  public void testEncryptionGcmCtr() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(SchemaCryptoPropertiesFactory.CONF_ENCRYPTION_ALGORITHM, ParquetCipher.AES_GCM_CTR_V1.toString());
+    runTest(conf);
+  }
+
+  @Test
+  public void testEncryptionWithFooter() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(SchemaCryptoPropertiesFactory.CONF_ENCRYPTION_FOOTER, true);
+    runTest(conf);
+  }
+
+  private void runTest(Configuration conf ) throws Exception {
+    conf.set(EncryptionPropertiesFactory.CRYPTO_FACTORY_CLASS_PROPERTY_NAME,
+      SchemaCryptoPropertiesFactory.class.getName());
+    String file = createTempFile("test");
+    markEncryptColumns();
+    encryptParquetFile(file, conf);
+    decryptParquetFileAndValid(file, conf);
+  }
+  
+  private void markEncryptColumns() {
+    Map<String, Object> ageMetadata = new HashMap<>();
+    ageMetadata.put("columnKeyMetaData", "age_key_id");
+    cryptoMetadata.put("Age", ageMetadata);
+
+    Map<String, Object> linkMetadata = new HashMap<>();
+    linkMetadata.put("columnKeyMetaData", "link_key_id");
+    cryptoMetadata.put("LinkedIn", linkMetadata);
+  }
+
+  private String encryptParquetFile(String file, Configuration conf) throws IOException {
+    MessageType schema = new MessageType("schema",
+      new PrimitiveType(REQUIRED, BINARY, "Name"),
+      new PrimitiveType(REQUIRED, INT64, "Age"),
+      new GroupType(OPTIONAL, "WebLinks",
+        new PrimitiveType(REPEATED, BINARY, "LinkedIn"),
+        new PrimitiveType(REPEATED, BINARY, "Twitter")));
+
+    conf.set(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, schema.toString());
+    Path path = new Path(file);
+    Builder builder = new Builder(path);
+    builder.withConf(conf);
+
+   try (ParquetWriter writer = builder.build()) {
+      for (int i = 0; i < 1000; i++) {
+        SimpleGroup g = new SimpleGroup(schema);
+        g.add("Name", (String)testData.get("Name")[i]);
+        g.add("Age", (Long)testData.get("Age")[i]);
+        Group links = g.addGroup("WebLinks");
+        links.add(0, (String)testData.get("LinkedIn")[i]);
+        links.add(1, (String)testData.get("Twitter")[i]);
+        writer.write(g);
+      }
+    }
+
+    return file;
+  }
+
+  private void decryptParquetFileAndValid(String file, Configuration conf) throws IOException {
+    ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), new Path(file)).withConf(conf).build();
+    for (int i = 0; i < numRecord; i++) {
+      Group group = reader.read();
+      assertEquals(testData.get("Name")[i], group.getBinary("Name", 0).toStringUsingUTF8());
+      assertEquals(testData.get("Age")[i], group.getLong("Age", 0));
+
+      Group subGroup = group.getGroup("WebLinks", 0);
+      assertArrayEquals(subGroup.getBinary("LinkedIn", 0).getBytes(), ((String)testData.get("LinkedIn")[i]).getBytes());
+      assertArrayEquals(subGroup.getBinary("Twitter", 0).getBytes(), ((String)testData.get("Twitter")[i]).getBytes());
+    }
+    reader.close();
+  }
+
+  private static String createTempFile(String prefix) {
+    try {
+      return Files.createTempDirectory(prefix).toAbsolutePath().toString() + "/test.parquet";
+    } catch (IOException e) {
+      throw new AssertionError("Unable to create temporary file", e);
+    }
+  }
+
+  private static long getLong() {
+    return ThreadLocalRandom.current().nextLong(1000);
+  }
+
+  private String getString() {
+    char[] chars = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'x', 'z', 'y'};
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < 100; i++) {
+      sb.append(chars[rnd.nextInt(10)]);
+    }
+    return sb.toString();
+  }
+
+  private class CryptoGroupWriteSupport extends GroupWriteSupport {
+
+    public CryptoGroupWriteSupport() {
+      super();
+    }
+
+    @Override
+    public WriteContext init(Configuration conf) {
+      WriteContext writeContext = super.init(conf);
+      MessageType schema = writeContext.getSchema();
+      List<ColumnDescriptor> columns = schema.getColumns();
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("There are " + columns.size() + " columns");
+      }
+
+      for (ColumnDescriptor column : columns) {
+        setMetadata(column, conf);
+      }
+
+      return writeContext;
+    }
+
+    private void setMetadata(ColumnDescriptor column, Configuration conf) {
+      String columnShortName = column.getPath()[column.getPath().length - 1];
+      if (cryptoMetadata.containsKey(columnShortName) &&
+        cryptoMetadata.get(columnShortName).get("columnKeyMetaData") != null) {
+        String columnKey = String.join(".", column.getPath());
+        conf.set(SchemaCryptoPropertiesFactory.PATH_NAME_PREFIX + columnKey,
+          cryptoMetadata.get(columnShortName).get("columnKeyMetaData").toString());
+      }
+    }
+  }
+
+  public class Builder extends ParquetWriter.Builder<Group, Builder> {
+
+    private Builder(Path file) {
+      super(file);
+    }
+
+    @Override
+    protected Builder self() {
+      return this;
+    }
+
+    @Override
+    protected WriteSupport<Group> getWriteSupport(Configuration conf) {
+      return new CryptoGroupWriteSupport();
+    }
+  }
+}
+
+
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaCryptoPropertiesFactory.java b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaCryptoPropertiesFactory.java
new file mode 100644
index 0000000..446fc6d
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaCryptoPropertiesFactory.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.crypto.propertiesfactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.DecryptionKeyRetrieverMock;
+import org.apache.parquet.crypto.DecryptionPropertiesFactory;
+import org.apache.parquet.crypto.EncryptionPropertiesFactory;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.ParquetCipher;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.hadoop.api.WriteSupport.WriteContext;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SchemaCryptoPropertiesFactory implements EncryptionPropertiesFactory, DecryptionPropertiesFactory {
+
+  public static final String PATH_NAME_PREFIX = "column_encryption_1178_";
+
+  private static Logger log = LoggerFactory.getLogger(SchemaCryptoPropertiesFactory.class);
+
+  public static final String CONF_ENCRYPTION_ALGORITHM = "parquet.encryption.algorithm";
+  public static final String CONF_ENCRYPTION_FOOTER = "parquet.encrypt.footer";
+  private static final byte[] FOOTER_KEY = {0x01, 0x02, 0x03, 0x4, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a,
+    0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10};
+  private static final byte[] FOOTER_KEY_METADATA = "footkey".getBytes(StandardCharsets.UTF_8);
+  private static final byte[] COL_KEY = {0x02, 0x03, 0x4, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b,
+    0x0c, 0x0d, 0x0e, 0x0f, 0x10, 0x11};
+  private static final byte[] COL_KEY_METADATA = "col".getBytes(StandardCharsets.UTF_8);
+
+  @Override
+  public FileEncryptionProperties getFileEncryptionProperties(Configuration conf, Path tempFilePath,
+                                                              WriteContext fileWriteContext) throws ParquetCryptoRuntimeException {
+    MessageType schema = fileWriteContext.getSchema();
+    List<String[]> paths = schema.getPaths();
+    if (paths == null || paths.isEmpty()) {
+      throw new ParquetCryptoRuntimeException("Null or empty fields is found");
+    }
+
+    Map<ColumnPath, ColumnEncryptionProperties> columnPropertyMap = new HashMap<>();
+
+    for (String[] path : paths) {
+      getColumnEncryptionProperties(path, columnPropertyMap, conf);
+    }
+
+    if (columnPropertyMap.size() == 0) {
+      log.debug("No column is encrypted. Returning null so that Parquet can skip. Empty properties will cause Parquet exception");
+      return null;
+    }
+
+    /**
+     * Why we still need footerKeyMetadata even withEncryptedFooter as false? According to the
+     * 'Plaintext Footer' section of
+     * https://github.com/apache/parquet-format/blob/encryption/Encryption.md, the plaintext footer
+     * is signed in order to prevent tampering with the FileMetaData contents. So footerKeyMetadata
+     * is always needed. This signature will be verified if parquet-mr code is with parquet-1178.
+     * Otherwise, it will be ignored.
+     */
+    boolean shouldEncryptFooter = getEncryptFooter(conf);
+    FileEncryptionProperties.Builder encryptionPropertiesBuilder =
+      FileEncryptionProperties.builder(FOOTER_KEY)
+        .withFooterKeyMetadata(FOOTER_KEY_METADATA)
+        .withAlgorithm(getParquetCipherOrDefault(conf))
+        .withEncryptedColumns(columnPropertyMap);
+    if (!shouldEncryptFooter) {
+      encryptionPropertiesBuilder = encryptionPropertiesBuilder.withPlaintextFooter();
+    }
+    FileEncryptionProperties encryptionProperties = encryptionPropertiesBuilder.build();
+    log.info(
+      "FileEncryptionProperties is built with, algorithm:{}, footerEncrypted:{}",
+      encryptionProperties.getAlgorithm(),
+      encryptionProperties.encryptedFooter());
+    return encryptionProperties;
+  }
+
+  private ParquetCipher getParquetCipherOrDefault(Configuration conf) {
+    String algorithm = conf.get(CONF_ENCRYPTION_ALGORITHM, "AES_GCM_CTR_V1");
+    log.debug("Encryption algorithm is {}", algorithm);
+    return ParquetCipher.valueOf(algorithm.toUpperCase());
+  }
+
+  private boolean getEncryptFooter(Configuration conf) {
+    boolean encryptFooter = conf.getBoolean(CONF_ENCRYPTION_FOOTER, false);
+    log.debug("Encrypt Footer: {}", encryptFooter);
+    return encryptFooter;
+  }
+
+  private void getColumnEncryptionProperties(String[] path, Map<ColumnPath, ColumnEncryptionProperties> columnPropertyMap,
+                                             Configuration conf) throws ParquetCryptoRuntimeException {
+    String pathName = String.join(".", path);
+    String columnKeyName = conf.get(PATH_NAME_PREFIX + pathName, null);
+    if (columnKeyName != null) {
+      ColumnPath columnPath = ColumnPath.get(path);
+      ColumnEncryptionProperties colEncProp = ColumnEncryptionProperties.builder(columnPath)
+        .withKey(COL_KEY)
+        .withKeyMetaData(COL_KEY_METADATA)
+        .build();
+      columnPropertyMap.put(columnPath, colEncProp);
+    }
+  }
+
+  @Override
+  public FileDecryptionProperties getFileDecryptionProperties(Configuration hadoopConfig, Path filePath)
+    throws ParquetCryptoRuntimeException {
+    DecryptionKeyRetrieverMock keyRetriever = new DecryptionKeyRetrieverMock();
+    keyRetriever.putKey("footkey", FOOTER_KEY);
+    keyRetriever.putKey("col", COL_KEY);
+    return FileDecryptionProperties.builder().withPlaintextFilesAllowed().withKeyRetriever(keyRetriever).build();
+  }
+}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
index 0569c42..22dfc43 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
@@ -114,6 +114,11 @@
     public long defaultBlockSize() {
       return file.defaultBlockSize();
     }
+
+    @Override
+    public String getPath() {
+      return file.getPath();
+    }
   }
 
   private int pageSize = 1024;