CHUKWA-772. Added ChukwaParquetWriter.  (Eric Yang)
diff --git a/CHANGES.txt b/CHANGES.txt
index e23338b..f4e7204 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,8 @@
 
   NEW FEATURES
 
+    CHUKWA-772. Added ChukwaParquetWriter.  (Eric Yang)
+
     CHUKWA-756. Added ajax-solr UI for log search.  (Eric Yang)
 
     CHUKWA-755. Added a reverse proxy to Solr for HICC.  (Eric Yang)
diff --git a/pom.xml b/pom.xml
index 82a7f03..ba4f745 100644
--- a/pom.xml
+++ b/pom.xml
@@ -327,6 +327,11 @@
             <artifactId>shiro-web</artifactId>
             <version>1.2.3</version>
           </dependency>
+          <dependency>
+            <groupId>org.apache.parquet</groupId>
+            <artifactId>parquet-avro</artifactId>
+            <version>1.7.0</version>
+          </dependency>
     </dependencies>
 
     <developers>
@@ -1326,10 +1331,6 @@
 
     <repositories>
         <repository>
-            <id>codehaus</id>
-            <url>http://repository.codehaus.org/</url>
-        </repository>
-        <repository>
             <id>clojars</id>
             <url>http://clojars.org/repo/</url>
         </repository>
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/parquet/ChukwaParquetWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/parquet/ChukwaParquetWriter.java
new file mode 100644
index 0000000..6104750
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/parquet/ChukwaParquetWriter.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.writer.parquet;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.Calendar;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
+import org.apache.hadoop.chukwa.datacollection.writer.PipelineableWriter;
+import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+public class ChukwaParquetWriter extends PipelineableWriter {
+  private static Logger LOG = Logger.getLogger(ChukwaParquetWriter.class);
+  public static final String OUTPUT_DIR_OPT= "chukwaCollector.outputDir";
+  private int blockSize = 64 * 1024 * 1024;
+  private int pageSize = 64 * 1024;
+  private Schema avroSchema = null;
+  private AvroParquetWriter<GenericRecord> parquetWriter = null;
+  protected String outputDir = null;
+  private Calendar calendar = Calendar.getInstance();
+  private String localHostAddr = null;
+  private long rotateInterval = 300000L;
+  private long startTime = 0;
+  private Path previousPath = null;
+  private String previousFileName = null;
+  private FileSystem fs = null;
+  
+  public ChukwaParquetWriter() throws WriterException {
+    this(ChukwaAgent.getStaticConfiguration());
+  }
+
+  public ChukwaParquetWriter(Configuration c) throws WriterException {
+    setup(c);
+  }
+
+  @Override
+  public void init(Configuration c) throws WriterException {
+  }
+
+  private void setup(Configuration c) throws WriterException {
+    try {
+      localHostAddr = "_" + InetAddress.getLocalHost().getHostName() + "_";
+    } catch (UnknownHostException e) {
+      localHostAddr = "-NA-";
+    }
+    outputDir = c.get(OUTPUT_DIR_OPT, "/chukwa/logs");
+    blockSize = c.getInt("dfs.blocksize", 64 * 1024 * 1024);
+    rotateInterval = c.getLong("chukwaCollector.rotateInterval", 300000L);
+    if(fs == null) {
+      try {
+        fs = FileSystem.get(c);
+      } catch (IOException e) {
+        throw new WriterException(e);
+      }
+    }
+
+    String input = "{\"namespace\": \"chukwa.apache.org\"," +
+      "\"type\": \"record\"," +
+      "\"name\": \"Chunk\"," +
+      "\"fields\": [" +
+          "{\"name\": \"dataType\", \"type\": \"string\"}," +
+          "{\"name\": \"data\", \"type\": \"bytes\"}," +
+          "{\"name\": \"source\", \"type\": \"string\"}," +
+          "{\"name\": \"stream\", \"type\": \"string\"}," +
+          "{\"name\": \"tags\", \"type\": \"string\"}," +
+          "{\"name\": \"seqId\",  \"type\": [\"long\", \"null\"]}" +
+      "]"+
+     "}";
+
+    // load your Avro schema
+    avroSchema = new Schema.Parser().parse(input);
+    // generate the corresponding Parquet schema
+    rotate();
+  }
+
+  @Override
+  public void close() throws WriterException {
+    try {
+      parquetWriter.close();
+      fs.rename(previousPath, new Path(previousFileName + ".done"));
+    } catch (IOException e) {
+      throw new WriterException(e);
+    }
+  }
+
+  @Override
+  public CommitStatus add(List<Chunk> chunks) throws WriterException {
+    long elapsedTime = 0;
+    CommitStatus rv = ChukwaWriter.COMMIT_OK;
+    for(Chunk chunk : chunks) {
+      try {
+        GenericRecord record = new GenericData.Record(avroSchema);
+        record.put("dataType", chunk.getDataType());
+        record.put("data", ByteBuffer.wrap(chunk.getData()));
+        record.put("tags", chunk.getTags());
+        record.put("seqId", chunk.getSeqID());
+        record.put("source", chunk.getSource());
+        record.put("stream", chunk.getStreamName());
+        parquetWriter.write(record);
+        elapsedTime = System.currentTimeMillis() - startTime;
+        if(elapsedTime > rotateInterval) {
+          rotate();
+        }
+      } catch (IOException e) {
+        LOG.warn("Failed to store data to HDFS.");
+        LOG.warn(ExceptionUtil.getStackTrace(e));
+      }
+    }
+    if (next != null) {
+      rv = next.add(chunks); //pass data through
+    }
+    return rv;
+  }
+  
+  private void rotate() throws WriterException {
+    if(parquetWriter!=null) {
+      try {
+        parquetWriter.close();
+        fs.rename(previousPath, new Path(previousFileName + ".done"));
+      } catch (IOException e) {
+        LOG.warn("Fail to close Chukwa write ahead log.");
+      }
+    }
+    startTime = System.currentTimeMillis();
+    calendar.setTimeInMillis(startTime);
+
+    String newName = new java.text.SimpleDateFormat("yyyyMMddHHmmssSSS")
+        .format(calendar.getTime());
+    newName += localHostAddr + new java.rmi.server.UID().toString();
+    newName = newName.replace("-", "");
+    newName = newName.replace(":", "");
+    newName = newName.replace(".", "");
+    newName = outputDir + "/" + newName.trim();
+    LOG.info("writing: "+newName);
+    Path path = new Path(newName);
+    try {
+      parquetWriter = new AvroParquetWriter<GenericRecord>(path, avroSchema, CompressionCodecName.SNAPPY, blockSize, pageSize);
+      previousPath = path;
+      previousFileName = newName;
+    } catch (IOException e) {
+      throw new WriterException(e);
+    }
+  }
+}
diff --git a/src/site/apt/pipeline.apt b/src/site/apt/pipeline.apt
index 794bea3..274e596 100644
--- a/src/site/apt/pipeline.apt
+++ b/src/site/apt/pipeline.apt
@@ -46,20 +46,12 @@
 ---
 <property>
   <name>chukwa.pipeline</name>
-  <value>org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter</value>
+  <value>org.apache.hadoop.chukwa.datacollection.writer.parquet.ChukwaParquetWriter</value>
 </property>
 ---
 
-  In this mode, the filesystem to write to is determined by the option
-<writer.hdfs.filesystem> in <chukwa-agent-conf.xml>.
-
----
-<property>
-    <name>writer.hdfs.filesystem</name>
-    <value>hdfs://localhost:8020/</value>
-    <description>HDFS to dump to</description>
-</property>
----
+  In this mode, data will write to HDFS which has been defined by HADOOP_CONF_DIR environment
+variable.
 
   This is the only option that you really need to specify to get a working 
 pipeline.
@@ -85,7 +77,7 @@
 ---
 <property>
   <name>chukwa.pipeline</name>
-  <value>org.apache.hadoop.chukwa.datacollection.writer.SocketTeeWriter,org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter</value>
+  <value>org.apache.hadoop.chukwa.datacollection.writer.SocketTeeWriter,org.apache.hadoop.chukwa.datacollection.writer.parquet.ChukwaParquetWriter</value>
 </property>
 ---
 
@@ -142,23 +134,12 @@
 </property>
 ---
 
-SeqFileWriter
+ChukwaParquetWriter
 
-  The <SeqFileWriter> streams chunks of data to HDFS, and write data in
-temp filename with <.chukwa> suffix.  When the file is completed writing,
-the filename is renamed with <.done> suffix.  SeqFileWriter has the following
+  The <ChukwaParquetWriter> streams chunks of data to HDFS.  When the file is completed writing,
+the filename is renamed with <.done> suffix.  ChukwaParquetWriter has the following
 configuration in <chukwa-agent-conf.xml>.
 
-  * <<writer.hdfs.filesystem>> Location to name node address
-
----
-<property>
-    <name>writer.hdfs.filesystem</name>
-    <value>hdfs://localhost:8020/</value>
-    <description>HDFS to dump to</description>
-</property>
----
-
   * <<chukwaCollector.outputDir>> Location of collect data sink directory
 
 ---
@@ -179,35 +160,6 @@
 </property>
 ---
 
-  * <<chukwaCollector.isFixedTimeRotatorScheme>> A flag to indicate that the 
-    agent should close at a fixed offset after every rotateInterval. 
-    The default value is false which uses the default scheme where 
-    agents close after regular rotateIntervals.
-    If set to true then specify chukwaCollector.fixedTimeIntervalOffset value.
-    e.g., if isFixedTimeRotatorScheme is true and fixedTimeIntervalOffset is
-    set to 10000 and rotateInterval is set to 300000, then the agent will
-    close its files at 10 seconds past the 5 minute mark, if
-    isFixedTimeRotatorScheme is false, agents will rotate approximately
-    once every 5 minutes
-
----
-  <property>
-    <name>chukwaCollector.isFixedTimeRotatorScheme</name>
-    <value>false</value>
-  </property>
----
-
-  * <<chukwaCollector.fixedTimeIntervalOffset>> Chukwa fixed time interval 
-    offset value (ms)
-
----
-<property>
-    <name>chukwaCollector.fixedTimeIntervalOffset</name>
-    <value>30000</value>
-    <description>Chukwa fixed time interval offset value (ms)</description>
-</property>
----
-
 SocketTeeWriter
 
   The <SocketTeeWriter> allows external processes to watch
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaParquetWriter.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaParquetWriter.java
new file mode 100644
index 0000000..643a8c5
--- /dev/null
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaParquetWriter.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.writer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.ChunkBuilder;
+import org.apache.hadoop.chukwa.datacollection.writer.parquet.ChukwaParquetWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.junit.Assert;
+
+import junit.framework.TestCase;
+
+public class TestChukwaParquetWriter extends TestCase {
+  private final static Logger LOG = Logger.getLogger(TestChukwaParquetWriter.class);
+  /**
+   * Test records are written properly.
+   */
+  public void testWrite() {
+    // Write 10 chunks
+    ArrayList<Chunk> chunks = new ArrayList<Chunk>();
+    for(int i=0;i<10;i++) {
+      ChunkBuilder c = new ChunkBuilder();
+      c.addRecord(ByteBuffer.allocate(Integer.SIZE).putInt(i).array());
+      chunks.add(c.getChunk());
+    }
+    try {
+      Configuration conf = new Configuration();
+      String outputPath = System.getProperty("test.log.dir")+"/testParquet";
+      conf.set("chukwaCollector.outputDir", outputPath);
+      ChukwaWriter parquetWriter = new ChukwaParquetWriter(conf);
+      parquetWriter.add(chunks);
+      parquetWriter.close();
+      FileSystem fs = FileSystem.get(conf);
+      // Verify 10 chunks are written
+      Path file = new Path(outputPath);
+      FileStatus[] status = fs.listStatus(file);
+      for(FileStatus finfo : status) {
+        if(finfo.getPath().getName().contains(".done")) {
+          LOG.info("File name: "+finfo.getPath().getName());
+          LOG.info("File Size: " + finfo.getLen());
+          ParquetReader<GenericRecord> pr = ParquetReader.builder(new AvroReadSupport<GenericRecord>(), finfo.getPath()).build();
+          for(int i=0; i< 10; i++) {
+            GenericRecord nextRecord = pr.read();
+            int expected = ByteBuffer.wrap(chunks.get(i).getData()).getInt();
+            LOG.info("expected: " + expected);
+            ByteBuffer content = (ByteBuffer) nextRecord.get("data");
+            int actual = content.getInt();
+            LOG.info("actual: " + actual);
+            Assert.assertSame(expected, actual);
+          }
+        }
+        fs.delete(finfo.getPath(), true);
+      }
+    } catch (WriterException e) {
+      Assert.fail(e.getMessage());
+    } catch (IOException e) {
+      Assert.fail(e.getMessage());
+    }
+  }
+
+  /**
+   * Test file rotation interval.
+   */
+  public void testRotate() {
+    // Write 10 chunks
+    ArrayList<Chunk> chunks = new ArrayList<Chunk>();
+    for(int i=0;i<10;i++) {
+      ChunkBuilder c = new ChunkBuilder();
+      c.addRecord(ByteBuffer.allocate(Integer.SIZE).putInt(i).array());
+      chunks.add(c.getChunk());
+    }
+    try {
+      Configuration conf = new Configuration();
+      String outputPath = System.getProperty("test.log.dir")+"/testParquetRotate";
+      conf.set("chukwaCollector.outputDir", outputPath);
+      conf.setLong("chukwaCollector.rotateInterval", 3000L);
+      ChukwaWriter parquetWriter = new ChukwaParquetWriter(conf);
+      for(int i=0; i<2; i++) {
+        parquetWriter.add(chunks);
+        try {
+          Thread.sleep(3000L);
+        } catch (InterruptedException e) {
+          Assert.fail(e.getMessage());
+        }
+      }
+      parquetWriter.close();
+      FileSystem fs = FileSystem.get(conf);
+      // Verify 10 chunks are written
+      Path file = new Path(outputPath);
+      FileStatus[] status = fs.listStatus(file);
+      Assert.assertTrue(status.length >= 2);
+    } catch (WriterException e) {
+      Assert.fail(e.getMessage());
+    } catch (IOException e) {
+      Assert.fail(e.getMessage());
+    }
+  }
+
+}