add test, [TODO] check deps
diff --git a/parquet-thrift/pom.xml b/parquet-thrift/pom.xml
index 23adec6..10ade84 100644
--- a/parquet-thrift/pom.xml
+++ b/parquet-thrift/pom.xml
@@ -1,4 +1,5 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <parent>
     <groupId>com.twitter</groupId>
     <artifactId>parquet</artifactId>
@@ -67,6 +68,17 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.pig</groupId>
+      <artifactId>pig</artifactId>
+      <version>0.11.1</version>
+    </dependency>
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+      <version>1.6</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>com.twitter</groupId>
       <artifactId>parquet-pig</artifactId>
       <version>${project.version}</version>
@@ -136,5 +148,5 @@
         </executions>
       </plugin>
     </plugins>
-  </build> 
+  </build>
 </project>
diff --git a/parquet-thrift/src/test/java/parquet/hadoop/thrift/TestParquetToThriftReadProjection.java b/parquet-thrift/src/test/java/parquet/hadoop/thrift/TestParquetToThriftReadProjection.java
index 53d759a..35160cd 100644
--- a/parquet-thrift/src/test/java/parquet/hadoop/thrift/TestParquetToThriftReadProjection.java
+++ b/parquet-thrift/src/test/java/parquet/hadoop/thrift/TestParquetToThriftReadProjection.java
@@ -32,6 +32,12 @@
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.builtin.mock.Storage;
+import org.apache.pig.data.Tuple;
 import org.apache.thrift.TBase;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.protocol.TProtocol;
@@ -47,6 +53,8 @@
 import com.twitter.data.proto.tutorial.thrift.Name;
 import com.twitter.data.proto.tutorial.thrift.Person;
 import com.twitter.data.proto.tutorial.thrift.PhoneNumber;
+import parquet.pig.ParquetLoader;
+import parquet.pig.ParquetStorer;
 import parquet.thrift.test.RequiredListFixture;
 import parquet.thrift.test.RequiredMapFixture;
 import parquet.thrift.test.RequiredPrimitiveFixture;
@@ -90,6 +98,74 @@
     shouldDoProjection(conf,toWrite,toRead,AddressBook.class);
   }
 
+
+
+  public void writeIt() throws Exception{
+    final Path parquetFile = new Path("target/out.parquet");
+    Configuration conf=new Configuration();
+    final FileSystem fs = parquetFile.getFileSystem(conf);
+    if (fs.exists(parquetFile)) {
+      fs.delete(parquetFile, true);
+    }
+
+    //create a test file
+    final TProtocolFactory protocolFactory = new TCompactProtocol.Factory();
+    final TaskAttemptID taskId = new TaskAttemptID("local", 0, true, 0, 0);
+    final ThriftToParquetFileWriter w = new ThriftToParquetFileWriter(parquetFile, new TaskAttemptContext(conf, taskId), protocolFactory, Person.class);
+    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    final TProtocol protocol = protocolFactory.getProtocol(new TIOStreamTransport(baos));
+    final Person a =
+                    new Person(
+                            new Name("Bob", "Roberts"),
+                            0,
+                            "bob.roberts@example.com",
+                            Arrays.asList(new PhoneNumber("1234567890")));
+    a.write(protocol);
+    w.write(new BytesWritable(baos.toByteArray()));
+    w.close();
+  }
+
+  @Test
+  public void testStorer() throws ExecException, Exception {
+    writeIt();
+    String out = "target/out.parquet";
+    int rows = 1000;
+    Properties props = new Properties();
+    props.setProperty("parquet.compression", "uncompressed");
+    props.setProperty("parquet.page.size", "1000");
+    PigServer pigServer = new PigServer(ExecType.LOCAL, props);
+    Storage.Data data = Storage.resetData(pigServer);
+//    Collection<Tuple> list = new ArrayList<Tuple>();
+//    for (int i = 0; i < rows; i++) {
+//      list.add(tuple("a"+i));
+//    }
+//    data.set("in", "a:chararray", list );
+    pigServer.setBatchOn();
+//    pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();");
+//    pigServer.deleteFile(out);
+//    pigServer.registerQuery("Store A into '"+out+"' using "+ParquetStorer.class.getName()+"();");
+//    if (pigServer.executeBatch().get(0).getStatus() != ExecJob.JOB_STATUS.COMPLETED) {
+//      throw new RuntimeException("Job failed", pigServer.executeBatch().get(0).getException());
+//    }
+
+    pigServer.registerQuery("B = LOAD '"+out+"' USING "+ParquetLoader.class.getName()+"();");
+    pigServer.registerQuery("C = foreach B generate email;");
+    pigServer.registerQuery("Store C into 'out' using mock.Storage();");
+    if (pigServer.executeBatch().get(0).getStatus() != ExecJob.JOB_STATUS.COMPLETED) {
+      throw new RuntimeException("Job failed", pigServer.executeBatch().get(0).getException());
+    }
+
+    List<Tuple> result = data.get("out");
+
+//    assertEquals(rows, result.size());
+    int i = 0;
+    for (Tuple tuple : result) {
+      assertEquals("(Bob,Roberts)", tuple.get(0).toString());
+      ++i;
+    }
+  }
+
+
   @Test
   public void testPullingInRequiredStructWithFilter() throws Exception {
     final String projectionFilterDesc = "persons/{id};persons/email";