add test, [TODO] check deps
diff --git a/parquet-thrift/pom.xml b/parquet-thrift/pom.xml
index 23adec6..10ade84 100644
--- a/parquet-thrift/pom.xml
+++ b/parquet-thrift/pom.xml
@@ -1,4 +1,5 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.twitter</groupId>
<artifactId>parquet</artifactId>
@@ -67,6 +68,17 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.pig</groupId>
+ <artifactId>pig</artifactId>
+ <version>0.11.1</version>
+ </dependency>
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <version>1.6</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-pig</artifactId>
<version>${project.version}</version>
@@ -136,5 +148,5 @@
</executions>
</plugin>
</plugins>
- </build>
+ </build>
</project>
diff --git a/parquet-thrift/src/test/java/parquet/hadoop/thrift/TestParquetToThriftReadProjection.java b/parquet-thrift/src/test/java/parquet/hadoop/thrift/TestParquetToThriftReadProjection.java
index 53d759a..35160cd 100644
--- a/parquet-thrift/src/test/java/parquet/hadoop/thrift/TestParquetToThriftReadProjection.java
+++ b/parquet-thrift/src/test/java/parquet/hadoop/thrift/TestParquetToThriftReadProjection.java
@@ -32,6 +32,12 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.builtin.mock.Storage;
+import org.apache.pig.data.Tuple;
import org.apache.thrift.TBase;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
@@ -47,6 +53,8 @@
import com.twitter.data.proto.tutorial.thrift.Name;
import com.twitter.data.proto.tutorial.thrift.Person;
import com.twitter.data.proto.tutorial.thrift.PhoneNumber;
+import parquet.pig.ParquetLoader;
+import parquet.pig.ParquetStorer;
import parquet.thrift.test.RequiredListFixture;
import parquet.thrift.test.RequiredMapFixture;
import parquet.thrift.test.RequiredPrimitiveFixture;
@@ -90,6 +98,74 @@
shouldDoProjection(conf,toWrite,toRead,AddressBook.class);
}
+
+
+ public void writeIt() throws Exception{
+ final Path parquetFile = new Path("target/out.parquet");
+ Configuration conf=new Configuration();
+ final FileSystem fs = parquetFile.getFileSystem(conf);
+ if (fs.exists(parquetFile)) {
+ fs.delete(parquetFile, true);
+ }
+
+ //create a test file
+ final TProtocolFactory protocolFactory = new TCompactProtocol.Factory();
+ final TaskAttemptID taskId = new TaskAttemptID("local", 0, true, 0, 0);
+ final ThriftToParquetFileWriter w = new ThriftToParquetFileWriter(parquetFile, new TaskAttemptContext(conf, taskId), protocolFactory, Person.class);
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final TProtocol protocol = protocolFactory.getProtocol(new TIOStreamTransport(baos));
+ final Person a =
+ new Person(
+ new Name("Bob", "Roberts"),
+ 0,
+ "bob.roberts@example.com",
+ Arrays.asList(new PhoneNumber("1234567890")));
+ a.write(protocol);
+ w.write(new BytesWritable(baos.toByteArray()));
+ w.close();
+ }
+
+ @Test
+ public void testStorer() throws ExecException, Exception {
+ writeIt();
+ String out = "target/out.parquet";
+ int rows = 1000;
+ Properties props = new Properties();
+ props.setProperty("parquet.compression", "uncompressed");
+ props.setProperty("parquet.page.size", "1000");
+ PigServer pigServer = new PigServer(ExecType.LOCAL, props);
+ Storage.Data data = Storage.resetData(pigServer);
+// Collection<Tuple> list = new ArrayList<Tuple>();
+// for (int i = 0; i < rows; i++) {
+// list.add(tuple("a"+i));
+// }
+// data.set("in", "a:chararray", list );
+ pigServer.setBatchOn();
+// pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();");
+// pigServer.deleteFile(out);
+// pigServer.registerQuery("Store A into '"+out+"' using "+ParquetStorer.class.getName()+"();");
+// if (pigServer.executeBatch().get(0).getStatus() != ExecJob.JOB_STATUS.COMPLETED) {
+// throw new RuntimeException("Job failed", pigServer.executeBatch().get(0).getException());
+// }
+
+ pigServer.registerQuery("B = LOAD '"+out+"' USING "+ParquetLoader.class.getName()+"();");
+ pigServer.registerQuery("C = foreach B generate email;");
+ pigServer.registerQuery("Store C into 'out' using mock.Storage();");
+ if (pigServer.executeBatch().get(0).getStatus() != ExecJob.JOB_STATUS.COMPLETED) {
+ throw new RuntimeException("Job failed", pigServer.executeBatch().get(0).getException());
+ }
+
+ List<Tuple> result = data.get("out");
+
+// assertEquals(rows, result.size());
+ int i = 0;
+ for (Tuple tuple : result) {
+ assertEquals("(Bob,Roberts)", tuple.get(0).toString());
+ ++i;
+ }
+ }
+
+
@Test
public void testPullingInRequiredStructWithFilter() throws Exception {
final String projectionFilterDesc = "persons/{id};persons/email";