initial commit
diff --git a/README.md b/README.md
index 76e6df5..e69de29 100644
--- a/README.md
+++ b/README.md
@@ -1,2 +0,0 @@
-# parquet-writesupport-extensions
-This repo includes extension of Parquet WriteSupport Extensions. For example, extension for org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport
diff --git a/dependency-reduced-pom.xml b/dependency-reduced-pom.xml
new file mode 100644
index 0000000..984bd21
--- /dev/null
+++ b/dependency-reduced-pom.xml
@@ -0,0 +1,168 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>com.uber.parquetwritesupports</groupId>
+ <artifactId>ParquetWriteSupports</artifactId>
+ <version>2.0.0.1</version>
+ <build>
+ <resources>
+ <resource>
+ <directory>src/main/java</directory>
+ <excludes>
+ <exclude>**/*.java</exclude>
+ </excludes>
+ </resource>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ <testResources>
+ <testResource>
+ <directory>${project.basedir}/src/test/java</directory>
+ <excludes>
+ <exclude>**/*.java</exclude>
+ </excludes>
+ </testResource>
+ <testResource>
+ <directory>${project.basedir}/src/test/resources</directory>
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>3.2.0</version>
+ <executions>
+ <execution>
+ <id>scala-compile-first</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>add-source</goal>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>scala-test-compile</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <scalaVersion>${scala.version}</scalaVersion>
+ <args>
+ <arg>-target:jvm-${java.version}</arg>
+ </args>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.3.2</version>
+ <configuration>
+ <source>${java.version}</source>
+ <target>${java.version}</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.4.3</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <profiles>
+ <profile>
+ <id>buildAntlr</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.antlr</groupId>
+ <artifactId>antlr4-maven-plugin</artifactId>
+ <version>4.5.3</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>antlr4</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <visitor>true</visitor>
+ <sourceDirectory>src/main/antlr4</sourceDirectory>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>generate-sources</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+ <tasks>
+ <move>
+ <fileset />
+ </move>
+ </tasks>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+ <repositories>
+ <repository>
+ <id>scala-tools.org</id>
+ <name>Scala-Tools Maven2 Repository</name>
+ <url>http://scala-tools.org/repo-releases</url>
+ </repository>
+ <repository>
+ <releases />
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ <id>central</id>
+ <name>Maven Repository</name>
+ <url>https://repo1.maven.org/maven2</url>
+ </repository>
+ </repositories>
+ <pluginRepositories>
+ <pluginRepository>
+ <id>scala-tools.org</id>
+ <name>Scala-Tools Maven2 Repository</name>
+ <url>http://scala-tools.org/repo-releases</url>
+ </pluginRepository>
+ </pluginRepositories>
+ <properties>
+ <java.version>1.8</java.version>
+ <scala.version>2.11.8</scala.version>
+ <encoding>UTF-8</encoding>
+ <scala.libversion>2.11</scala.libversion>
+ <spark.version>2.1.3</spark.version>
+ </properties>
+</project>
+
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..19422f4
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,305 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.uber.parquetwritesupports</groupId>
+ <artifactId>ParquetWriteSupports</artifactId>
+ <version>2.0.0.1</version>
+ <properties>
+ <scala.libversion>2.11</scala.libversion>
+ <scala.version>2.11.8</scala.version>
+ <spark.version>2.1.3</spark.version>
+ <java.version>1.8</java.version>
+ <encoding>UTF-8</encoding>
+ </properties>
+
+ <repositories>
+ <repository>
+ <id>scala-tools.org</id>
+ <name>Scala-Tools Maven2 Repository</name>
+ <url>http://scala-tools.org/repo-releases</url>
+ </repository>
+ <repository>
+ <id>central</id>
+ <!-- This should be at top, it makes maven try the central repo first and then others and hence faster dep resolution -->
+ <name>Maven Repository</name>
+ <url>https://repo1.maven.org/maven2</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
+ <pluginRepositories>
+ <pluginRepository>
+ <id>scala-tools.org</id>
+ <name>Scala-Tools Maven2 Repository</name>
+ <url>http://scala-tools.org/repo-releases</url>
+ </pluginRepository>
+ </pluginRepositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.libversion}</artifactId>
+ <version>${spark.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-beanutils</groupId>
+ <artifactId>commons-beanutils-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.libversion}</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-format</artifactId>
+ <version>2.5.1-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-column</artifactId>
+ <version>1.10.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-hadoop</artifactId>
+ <version>1.10.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-jackson</artifactId>
+ <version>1.10.1-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-common</artifactId>
+ <version>1.10.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-encoding</artifactId>
+ <version>1.10.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-tools</artifactId>
+ <version>1.10.1-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-hive_${scala.libversion}</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.jayway.jsonpath</groupId>
+ <artifactId>json-path</artifactId>
+ <version>2.2.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-beanutils</groupId>
+ <artifactId>commons-beanutils-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <resources>
+ <resource>
+ <directory>src/main/java</directory>
+ <excludes>
+ <exclude>**/*.java</exclude>
+ </excludes>
+ </resource>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+
+ <testResources>
+ <testResource>
+ <directory>${project.basedir}/src/test/java</directory>
+ <excludes>
+ <exclude>**/*.java</exclude>
+ </excludes>
+ </testResource>
+ <testResource>
+ <directory>${project.basedir}/src/test/resources</directory>
+ </testResource>
+ </testResources>
+
+
+ <plugins>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>3.2.0</version>
+ <executions>
+ <execution>
+ <id>scala-compile-first</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>add-source</goal>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>scala-test-compile</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <scalaVersion>${scala.version}</scalaVersion>
+ <args>
+ <arg>-target:jvm-${java.version}</arg>
+ </args>
+ </configuration>
+ </plugin>
+
+ <!-- Set a compiler level -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.3.2</version>
+ <configuration>
+ <source>${java.version}</source>
+ <target>${java.version}</target>
+ </configuration>
+ </plugin>
+
+ <!--Build assembly jar-->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.4.3</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ <!-- Run mvn compile -DskipTests -PbuildAntlr to build antlr if any change is made -->
+ <profile>
+ <id>buildAntlr</id>
+ <build>
+ <plugins>
+ <plugin>
+ <!-- command to generate sources: mvn generate-sources -->
+ <!-- mvn antlr4:help -Ddetail=true -->
+ <groupId>org.antlr</groupId>
+ <artifactId>antlr4-maven-plugin</artifactId>
+ <version>4.5.3</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>antlr4</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <visitor>true</visitor>
+ <sourceDirectory>src/main/antlr4</sourceDirectory>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>generate-sources</id>
+ <phase>generate-sources</phase>
+ <configuration>
+ <tasks>
+ <move todir="src/main/java/com/uber/hadoopprof/antlr4/generated" overwrite="true">
+ <fileset dir="${project.build.directory}/generated-sources/antlr4/" />
+ </move>
+ </tasks>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+</project>
diff --git a/products.json b/products.json
new file mode 100644
index 0000000..c02d0ba
--- /dev/null
+++ b/products.json
@@ -0,0 +1,3 @@
+{"product":"table", "price":30}
+{"product":"chair", "price":10}
+{"product":"table", "price":13}
diff --git a/src/main/java/com/uber/ParquetHelloWorld.java b/src/main/java/com/uber/ParquetHelloWorld.java
new file mode 100644
index 0000000..3a07bff
--- /dev/null
+++ b/src/main/java/com/uber/ParquetHelloWorld.java
@@ -0,0 +1,40 @@
+package com.uber;
+
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.execution.datasources.parquet.SchemaUtil;
+
+/**
+ * This app depends on Parquet-1178 and Parquet-1396.
+ */
+public class ParquetHelloWorld {
+
+ public static void main(String[] args) throws Exception {
+ SparkSession spark = SparkSession
+ .builder()
+ .appName("Java Spark SQL basic example")
+ .config("spark.master", "local")
+ .config("spark.sql.parquet.enableVectorizedReader", false)
+ .config("parquet.crypto.encryptor.decryptor.retriever.class",
+ "org.apache.parquet.crypto.SampleFileEncDecryptorRetriever")
+ .config("parquet.write.support.class",
+ org.apache.spark.sql.execution.datasources.parquet.CryptoParquetWriteSupport.class.getName())
+ .getOrCreate();
+
+ testColumnEncReadWrite(spark);
+ }
+
+ private static void testColumnEncReadWrite(SparkSession spark) {
+ String schemaString = "{\"type\":\"struct\",\"fields\":[{\"name\":\"price\",\"type\":\"long\",\"nullable\":true,\"metadata\":{\"encrypted\": true,\"columnKeyMetaData\": \"AAA=\"}},{\"name\":\"product\",\"type\":\"string\",\"nullable\":true,\"metadata\":{\"encrypted\": false}}]}";
+ StructType schema = org.apache.spark.sql.execution.datasources.parquet.SchemaUtil.parseString(schemaString);
+ JavaRDD<Row> rawData = spark.read().json("products.json").toJavaRDD();
+ Dataset<Row> dataFrame = spark.createDataFrame(rawData, schema);
+
+ dataFrame.write().mode("overwrite").parquet("file1");
+
+ spark.read().parquet("file1").show();
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/spark/sql/execution/datasources/parquet/CryptoParquetWriteSupport.scala b/src/main/java/org/apache/spark/sql/execution/datasources/parquet/CryptoParquetWriteSupport.scala
new file mode 100644
index 0000000..e3f2c1d
--- /dev/null
+++ b/src/main/java/org/apache/spark/sql/execution/datasources/parquet/CryptoParquetWriteSupport.scala
@@ -0,0 +1,20 @@
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
+
+class CryptoParquetWriteSupport extends ParquetWriteSupport {
+
+ override def init(configuration: Configuration): WriteContext = {
+ val converter = new ParquetMetadataSchemaConverter(configuration)
+ createContext(configuration, converter)
+ }
+
+ override def writeFields(
+ row: InternalRow, schema: StructType, fieldWriters: Seq[ValueWriter]): Unit = {
+ //Todo add data masking fields
+ super.writeFields(row, schema, fieldWriters)
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetMetadataSchemaConverter.scala b/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetMetadataSchemaConverter.scala
new file mode 100644
index 0000000..10deafc
--- /dev/null
+++ b/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetMetadataSchemaConverter.scala
@@ -0,0 +1,61 @@
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.util
+import java.util.Map
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.schema.{ExtType, MessageType, Type, Types}
+import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
+
+import scala.collection.JavaConversions._
+
+/**
+ * This class pass field's metadata in StructType to the field of MessageType in addition to
+ * existing functions of ParquetSchemaConverter.
+ *
+ * It has dependency on the class ExtType defined in the link below. Parquet-1396 is opened to merge
+ * ExtType to Parquet-mr repo. https://github.com/shangxinli/parquet-mr/blob/encryption/parquet-column/
+ * src/main/java/org/apache/parquet/schema/ExtType.java
+ *
+ */
+class ParquetMetadataSchemaConverter(conf: Configuration) extends ParquetSchemaConverter(conf) {
+
+ /**
+ * Converts a Spark SQL [[StructField]] to a Parquet [[Type]].
+ */
+ override def convert(catalystSchema: StructType): MessageType = {
+ Types
+ .buildMessage()
+ .addFields(catalystSchema.map(convertFieldWithMetadata): _*)
+ .named(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
+ }
+
+ def convertFieldWithMetadata(field: StructField) : Type = {
+ val extField = new ExtType[Any](convertField(field))
+ val metaBuilder = new ExtMetadataBuilder().withMetadata(field.metadata)
+ val metaData = metaBuilder.getMap
+ extField.setMetadata(metaData)
+ return extField
+ }
+
+ private def getMetadata(schema : StructType , fieldName : String) : Map[String, Any] = {
+ schema.fields.foreach{ field =>
+ if (field.name != null && field.name.equals(fieldName)) {
+ val metaBuilder = new ExtMetadataBuilder().withMetadata(field.metadata)
+ return metaBuilder.getMap
+ }
+ }
+ return new util.HashMap[String, Any]()
+ }
+}
+
+/**
+ * Due to the access modifier of getMap() in Spark, ExtMetadataBuilder is created to let getMap can be
+ * accessed in above class.
+ */
+class ExtMetadataBuilder extends MetadataBuilder {
+
+ override def getMap = {
+ super.getMap
+ }
+}
diff --git a/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SchemaUtil.scala b/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SchemaUtil.scala
new file mode 100644
index 0000000..7b40f47
--- /dev/null
+++ b/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SchemaUtil.scala
@@ -0,0 +1,12 @@
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.types.StructType
+
+/**
+ * This class is created to let the method fromString() can be used outside org.apache.spark.sql.types namespace
+ */
+object SchemaUtil {
+ def parseString(s : String) : StructType = {
+ return StructType.fromString(s)
+ }
+}
\ No newline at end of file
diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties
new file mode 100644
index 0000000..e4fcb01
--- /dev/null
+++ b/src/main/resources/log4j.properties
@@ -0,0 +1,9 @@
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=DEBUG, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file