CHUKWA-772. Added ChukwaParquetWriter. (Eric Yang)
diff --git a/CHANGES.txt b/CHANGES.txt
index e23338b..f4e7204 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,8 @@
NEW FEATURES
+ CHUKWA-772. Added ChukwaParquetWriter. (Eric Yang)
+
CHUKWA-756. Added ajax-solr UI for log search. (Eric Yang)
CHUKWA-755. Added a reverse proxy to Solr for HICC. (Eric Yang)
diff --git a/pom.xml b/pom.xml
index 82a7f03..ba4f745 100644
--- a/pom.xml
+++ b/pom.xml
@@ -327,6 +327,11 @@
<artifactId>shiro-web</artifactId>
<version>1.2.3</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-avro</artifactId>
+ <version>1.7.0</version>
+ </dependency>
</dependencies>
<developers>
@@ -1326,10 +1331,6 @@
<repositories>
<repository>
- <id>codehaus</id>
- <url>http://repository.codehaus.org/</url>
- </repository>
- <repository>
<id>clojars</id>
<url>http://clojars.org/repo/</url>
</repository>
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/parquet/ChukwaParquetWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/parquet/ChukwaParquetWriter.java
new file mode 100644
index 0000000..6104750
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/parquet/ChukwaParquetWriter.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.writer.parquet;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.Calendar;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
+import org.apache.hadoop.chukwa.datacollection.writer.PipelineableWriter;
+import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+public class ChukwaParquetWriter extends PipelineableWriter {
+ private static Logger LOG = Logger.getLogger(ChukwaParquetWriter.class);
+ public static final String OUTPUT_DIR_OPT= "chukwaCollector.outputDir";
+ private int blockSize = 64 * 1024 * 1024;
+ private int pageSize = 64 * 1024;
+ private Schema avroSchema = null;
+ private AvroParquetWriter<GenericRecord> parquetWriter = null;
+ protected String outputDir = null;
+ private Calendar calendar = Calendar.getInstance();
+ private String localHostAddr = null;
+ private long rotateInterval = 300000L;
+ private long startTime = 0;
+ private Path previousPath = null;
+ private String previousFileName = null;
+ private FileSystem fs = null;
+
+ public ChukwaParquetWriter() throws WriterException {
+ this(ChukwaAgent.getStaticConfiguration());
+ }
+
+ public ChukwaParquetWriter(Configuration c) throws WriterException {
+ setup(c);
+ }
+
+ @Override
+ public void init(Configuration c) throws WriterException {
+ }
+
+ private void setup(Configuration c) throws WriterException {
+ try {
+ localHostAddr = "_" + InetAddress.getLocalHost().getHostName() + "_";
+ } catch (UnknownHostException e) {
+ localHostAddr = "-NA-";
+ }
+ outputDir = c.get(OUTPUT_DIR_OPT, "/chukwa/logs");
+ blockSize = c.getInt("dfs.blocksize", 64 * 1024 * 1024);
+ rotateInterval = c.getLong("chukwaCollector.rotateInterval", 300000L);
+ if(fs == null) {
+ try {
+ fs = FileSystem.get(c);
+ } catch (IOException e) {
+ throw new WriterException(e);
+ }
+ }
+
+ String input = "{\"namespace\": \"chukwa.apache.org\"," +
+ "\"type\": \"record\"," +
+ "\"name\": \"Chunk\"," +
+ "\"fields\": [" +
+ "{\"name\": \"dataType\", \"type\": \"string\"}," +
+ "{\"name\": \"data\", \"type\": \"bytes\"}," +
+ "{\"name\": \"source\", \"type\": \"string\"}," +
+ "{\"name\": \"stream\", \"type\": \"string\"}," +
+ "{\"name\": \"tags\", \"type\": \"string\"}," +
+ "{\"name\": \"seqId\", \"type\": [\"long\", \"null\"]}" +
+ "]"+
+ "}";
+
+ // load your Avro schema
+ avroSchema = new Schema.Parser().parse(input);
+ // generate the corresponding Parquet schema
+ rotate();
+ }
+
+ @Override
+ public void close() throws WriterException {
+ try {
+ parquetWriter.close();
+ fs.rename(previousPath, new Path(previousFileName + ".done"));
+ } catch (IOException e) {
+ throw new WriterException(e);
+ }
+ }
+
+ @Override
+ public CommitStatus add(List<Chunk> chunks) throws WriterException {
+ long elapsedTime = 0;
+ CommitStatus rv = ChukwaWriter.COMMIT_OK;
+ for(Chunk chunk : chunks) {
+ try {
+ GenericRecord record = new GenericData.Record(avroSchema);
+ record.put("dataType", chunk.getDataType());
+ record.put("data", ByteBuffer.wrap(chunk.getData()));
+ record.put("tags", chunk.getTags());
+ record.put("seqId", chunk.getSeqID());
+ record.put("source", chunk.getSource());
+ record.put("stream", chunk.getStreamName());
+ parquetWriter.write(record);
+ elapsedTime = System.currentTimeMillis() - startTime;
+ if(elapsedTime > rotateInterval) {
+ rotate();
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to store data to HDFS.");
+ LOG.warn(ExceptionUtil.getStackTrace(e));
+ }
+ }
+ if (next != null) {
+ rv = next.add(chunks); //pass data through
+ }
+ return rv;
+ }
+
+ private void rotate() throws WriterException {
+ if(parquetWriter!=null) {
+ try {
+ parquetWriter.close();
+ fs.rename(previousPath, new Path(previousFileName + ".done"));
+ } catch (IOException e) {
+ LOG.warn("Fail to close Chukwa write ahead log.");
+ }
+ }
+ startTime = System.currentTimeMillis();
+ calendar.setTimeInMillis(startTime);
+
+ String newName = new java.text.SimpleDateFormat("yyyyMMddHHmmssSSS")
+ .format(calendar.getTime());
+ newName += localHostAddr + new java.rmi.server.UID().toString();
+ newName = newName.replace("-", "");
+ newName = newName.replace(":", "");
+ newName = newName.replace(".", "");
+ newName = outputDir + "/" + newName.trim();
+ LOG.info("writing: "+newName);
+ Path path = new Path(newName);
+ try {
+ parquetWriter = new AvroParquetWriter<GenericRecord>(path, avroSchema, CompressionCodecName.SNAPPY, blockSize, pageSize);
+ previousPath = path;
+ previousFileName = newName;
+ } catch (IOException e) {
+ throw new WriterException(e);
+ }
+ }
+}
diff --git a/src/site/apt/pipeline.apt b/src/site/apt/pipeline.apt
index 794bea3..274e596 100644
--- a/src/site/apt/pipeline.apt
+++ b/src/site/apt/pipeline.apt
@@ -46,20 +46,12 @@
---
<property>
<name>chukwa.pipeline</name>
- <value>org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter</value>
+ <value>org.apache.hadoop.chukwa.datacollection.writer.parquet.ChukwaParquetWriter</value>
</property>
---
- In this mode, the filesystem to write to is determined by the option
-<writer.hdfs.filesystem> in <chukwa-agent-conf.xml>.
-
----
-<property>
- <name>writer.hdfs.filesystem</name>
- <value>hdfs://localhost:8020/</value>
- <description>HDFS to dump to</description>
-</property>
----
+ In this mode, data will write to HDFS which has been defined by HADOOP_CONF_DIR environment
+variable.
This is the only option that you really need to specify to get a working
pipeline.
@@ -85,7 +77,7 @@
---
<property>
<name>chukwa.pipeline</name>
- <value>org.apache.hadoop.chukwa.datacollection.writer.SocketTeeWriter,org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter</value>
+ <value>org.apache.hadoop.chukwa.datacollection.writer.SocketTeeWriter,org.apache.hadoop.chukwa.datacollection.writer.parquet.ChukwaParquetWriter</value>
</property>
---
@@ -142,23 +134,12 @@
</property>
---
-SeqFileWriter
+ChukwaParquetWriter
- The <SeqFileWriter> streams chunks of data to HDFS, and write data in
-temp filename with <.chukwa> suffix. When the file is completed writing,
-the filename is renamed with <.done> suffix. SeqFileWriter has the following
+ The <ChukwaParquetWriter> streams chunks of data to HDFS. When the file is completed writing,
+the filename is renamed with <.done> suffix. ChukwaParquetWriter has the following
configuration in <chukwa-agent-conf.xml>.
- * <<writer.hdfs.filesystem>> Location to name node address
-
----
-<property>
- <name>writer.hdfs.filesystem</name>
- <value>hdfs://localhost:8020/</value>
- <description>HDFS to dump to</description>
-</property>
----
-
* <<chukwaCollector.outputDir>> Location of collect data sink directory
---
@@ -179,35 +160,6 @@
</property>
---
- * <<chukwaCollector.isFixedTimeRotatorScheme>> A flag to indicate that the
- agent should close at a fixed offset after every rotateInterval.
- The default value is false which uses the default scheme where
- agents close after regular rotateIntervals.
- If set to true then specify chukwaCollector.fixedTimeIntervalOffset value.
- e.g., if isFixedTimeRotatorScheme is true and fixedTimeIntervalOffset is
- set to 10000 and rotateInterval is set to 300000, then the agent will
- close its files at 10 seconds past the 5 minute mark, if
- isFixedTimeRotatorScheme is false, agents will rotate approximately
- once every 5 minutes
-
----
- <property>
- <name>chukwaCollector.isFixedTimeRotatorScheme</name>
- <value>false</value>
- </property>
----
-
- * <<chukwaCollector.fixedTimeIntervalOffset>> Chukwa fixed time interval
- offset value (ms)
-
----
-<property>
- <name>chukwaCollector.fixedTimeIntervalOffset</name>
- <value>30000</value>
- <description>Chukwa fixed time interval offset value (ms)</description>
-</property>
----
-
SocketTeeWriter
The <SocketTeeWriter> allows external processes to watch
diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaParquetWriter.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaParquetWriter.java
new file mode 100644
index 0000000..643a8c5
--- /dev/null
+++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaParquetWriter.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.writer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.ChunkBuilder;
+import org.apache.hadoop.chukwa.datacollection.writer.parquet.ChukwaParquetWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.junit.Assert;
+
+import junit.framework.TestCase;
+
+public class TestChukwaParquetWriter extends TestCase {
+ private final static Logger LOG = Logger.getLogger(TestChukwaParquetWriter.class);
+ /**
+ * Test records are written properly.
+ */
+ public void testWrite() {
+ // Write 10 chunks
+ ArrayList<Chunk> chunks = new ArrayList<Chunk>();
+ for(int i=0;i<10;i++) {
+ ChunkBuilder c = new ChunkBuilder();
+ c.addRecord(ByteBuffer.allocate(Integer.SIZE).putInt(i).array());
+ chunks.add(c.getChunk());
+ }
+ try {
+ Configuration conf = new Configuration();
+ String outputPath = System.getProperty("test.log.dir")+"/testParquet";
+ conf.set("chukwaCollector.outputDir", outputPath);
+ ChukwaWriter parquetWriter = new ChukwaParquetWriter(conf);
+ parquetWriter.add(chunks);
+ parquetWriter.close();
+ FileSystem fs = FileSystem.get(conf);
+ // Verify 10 chunks are written
+ Path file = new Path(outputPath);
+ FileStatus[] status = fs.listStatus(file);
+ for(FileStatus finfo : status) {
+ if(finfo.getPath().getName().contains(".done")) {
+ LOG.info("File name: "+finfo.getPath().getName());
+ LOG.info("File Size: " + finfo.getLen());
+ ParquetReader<GenericRecord> pr = ParquetReader.builder(new AvroReadSupport<GenericRecord>(), finfo.getPath()).build();
+ for(int i=0; i< 10; i++) {
+ GenericRecord nextRecord = pr.read();
+ int expected = ByteBuffer.wrap(chunks.get(i).getData()).getInt();
+ LOG.info("expected: " + expected);
+ ByteBuffer content = (ByteBuffer) nextRecord.get("data");
+ int actual = content.getInt();
+ LOG.info("actual: " + actual);
+ Assert.assertSame(expected, actual);
+ }
+ }
+ fs.delete(finfo.getPath(), true);
+ }
+ } catch (WriterException e) {
+ Assert.fail(e.getMessage());
+ } catch (IOException e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ /**
+ * Test file rotation interval.
+ */
+ public void testRotate() {
+ // Write 10 chunks
+ ArrayList<Chunk> chunks = new ArrayList<Chunk>();
+ for(int i=0;i<10;i++) {
+ ChunkBuilder c = new ChunkBuilder();
+ c.addRecord(ByteBuffer.allocate(Integer.SIZE).putInt(i).array());
+ chunks.add(c.getChunk());
+ }
+ try {
+ Configuration conf = new Configuration();
+ String outputPath = System.getProperty("test.log.dir")+"/testParquetRotate";
+ conf.set("chukwaCollector.outputDir", outputPath);
+ conf.setLong("chukwaCollector.rotateInterval", 3000L);
+ ChukwaWriter parquetWriter = new ChukwaParquetWriter(conf);
+ for(int i=0; i<2; i++) {
+ parquetWriter.add(chunks);
+ try {
+ Thread.sleep(3000L);
+ } catch (InterruptedException e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+ parquetWriter.close();
+ FileSystem fs = FileSystem.get(conf);
+ // Verify 10 chunks are written
+ Path file = new Path(outputPath);
+ FileStatus[] status = fs.listStatus(file);
+ Assert.assertTrue(status.length >= 2);
+ } catch (WriterException e) {
+ Assert.fail(e.getMessage());
+ } catch (IOException e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+}