blob: 8d0dd46fe5db31fa12492265d9cc157a413e074b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.chukwa.datacollection.writer;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
import java.util.Date;
import java.text.SimpleDateFormat;
import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.ChunkBuilder;
import org.apache.hadoop.chukwa.datacollection.writer.localfs.LocalWriter;
import org.apache.hadoop.chukwa.datacollection.writer.parquet.ChukwaParquetWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
public class TestChukwaWriters extends TestCase{
public void testWriters() {
try {
File tempDir = new File(System.getProperty("test.build.data", "/tmp"));
if (!tempDir.exists()) {
tempDir.mkdirs();
}
String outputDirectory = tempDir.getPath() + "/testChukwaWriters_testWriters_" + System.currentTimeMillis() + "/";
Configuration confParquetWriter = new Configuration();
confParquetWriter.set("chukwaCollector.rotateInterval", "300000");
confParquetWriter.set("writer.hdfs.filesystem", "file:///");
String parquetWriterOutputDir = outputDirectory +"/parquetWriter/parquetOutputDir";
confParquetWriter.set(ChukwaParquetWriter.OUTPUT_DIR_OPT, parquetWriterOutputDir );
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
ChukwaWriter parquetWriter = new ChukwaParquetWriter(confParquetWriter);
List<Chunk> chunksParquetWriter = new LinkedList<Chunk>();
List<Chunk> chunksLocalWriter = new LinkedList<Chunk>();
for(int i=0;i<10;i++) {
ChunkBuilder cb1 = new ChunkBuilder();
cb1.addRecord(("record-" +i) .getBytes());
cb1.addRecord("foo" .getBytes());
cb1.addRecord("bar".getBytes());
cb1.addRecord("baz".getBytes());
chunksParquetWriter.add(cb1.getChunk());
ChunkBuilder cb2 = new ChunkBuilder();
cb2.addRecord(("record-" +i) .getBytes());
cb2.addRecord("foo" .getBytes());
cb2.addRecord("bar".getBytes());
cb2.addRecord("baz".getBytes());
chunksLocalWriter.add(cb2.getChunk());
}
Thread.sleep(5000);
parquetWriter.add(chunksParquetWriter);
parquetWriter.close();
String parquetWriterFile = null;
File directory = new File(parquetWriterOutputDir);
String[] files = directory.list();
for(String file: files) {
if ( file.endsWith(".done") ){
parquetWriterFile = parquetWriterOutputDir + File.separator + file;
break;
}
}
Assert.assertFalse(parquetWriterFile == null);
String parquetWriterDump = dumpArchive(fs,conf,parquetWriterFile);
Configuration confLocalWriter = new Configuration();
confLocalWriter.set("writer.hdfs.filesystem", "file:///");
String localWriterOutputDir = outputDirectory +"/localWriter/localOutputDir";
confLocalWriter.set("chukwaCollector.localOutputDir",localWriterOutputDir);
confLocalWriter.set("chukwaCollector.rotateInterval", "300000");
confLocalWriter.set("chukwaCollector.minPercentFreeDisk", "2");//so unit tests pass on
//machines with mostly-full disks
ChukwaWriter localWriter = new LocalWriter(confLocalWriter);
String localWriterFile = null;
localWriter.init(confLocalWriter);
Thread.sleep(5000);
localWriter.add(chunksLocalWriter);
localWriter.close();
directory = new File(localWriterOutputDir);
files = directory.list();
for(String file: files) {
if ( file.endsWith(".done") ){
localWriterFile = localWriterOutputDir + File.separator + file;
break;
}
}
Assert.assertFalse(localWriterFile == null);
String localWriterDump = dumpArchive(fs,conf,localWriterFile);
Assert.assertTrue(parquetWriterDump.intern() == localWriterDump.intern());
File fOutputDirectory = new File(outputDirectory);
fOutputDirectory.delete();
} catch (Throwable e) {
e.printStackTrace();
Assert.fail("Exception in TestChukwaWriters," + e.getMessage());
}
}
protected String dumpArchive(FileSystem fs,Configuration conf, String file) throws Throwable {
AvroParquetReader<GenericRecord> reader = null;
try {
reader = new AvroParquetReader<GenericRecord>(conf, new Path(file));
StringBuilder sb = new StringBuilder();
while (true) {
GenericRecord record = reader.read();
if(record == null) {
break;
}
sb.append("DataType: " + record.get("dataType"));
sb.append("StreamName: " + record.get("stream"));
sb.append("SeqId: " + record.get("seqId"));
sb.append("\t\t =============== ");
sb.append("Cluster : " + record.get("tags"));
sb.append("DataType : " + record.get("dataType"));
sb.append("Source : " + record.get("source"));
sb.append("Application : " + record.get("stream"));
sb.append("SeqID : " + record.get("seqId"));
byte[] data = ((ByteBuffer)record.get("data")).array();
sb.append("Data : " + new String(data));
return sb.toString();
}
} catch (Throwable e) {
Assert.fail("Exception while reading ParquetFile"+ e.getMessage());
throw e;
}
finally {
if (reader != null) {
reader.close();
}
}
return null;
}
// /**
// * Test to check if the .chukwa files are closing at the time we expect them
// * to close. This test sets the rotateInterval and offsetInterval to small
// * values, reads the filename of the first .chukwa file, extracts the
// * timestamp from its name, calculates the timestamp when the next .chukwa
// * file should be closed, sleeps for some time (enough for producing the next
// * .chukwa file), reads the timestamp on the second .chukwa file, and
// * compares the expected close timestamp with the actual closing timestamp of
// * the second file.
// */
// public void testParquetWriterFixedCloseInterval() {
// try {
// long rotateInterval = 10000;
// long intervalOffset = 3000;
//
// File tempDir = new File(System.getProperty("test.build.data", "/tmp"));
// if (!tempDir.exists()) {
// tempDir.mkdirs();
// }
//
// String outputDirectory = tempDir.getPath() + "/testChukwaWriters_testChukwaParquetWriterFixedCloseInterval_" +
// System.currentTimeMillis() + "/";
// String parquetWriterOutputDir = outputDirectory +"/parquetWriter/parquetOutputDir";
//
// File directory = new File(parquetWriterOutputDir);
// // if some files already exist in this directory then delete them. Files
// // may exist due to an old test run.
// File[] files = directory.listFiles();
// if (files != null) {
// for(File file: files) {
// file.delete();
// }
// }
//
// Configuration confParquetWriter = new Configuration();
// confParquetWriter.set("chukwaCollector.rotateInterval", String.valueOf(rotateInterval));
// confParquetWriter.set("writer.hdfs.filesystem", "file:///");
// confParquetWriter.set(ChukwaParquetWriter.OUTPUT_DIR_OPT, parquetWriterOutputDir );
// confParquetWriter.set("chukwaCollector.isFixedTimeRotatorScheme", "true");
// confParquetWriter.set("chukwaCollector.fixedTimeIntervalOffset", String.valueOf(intervalOffset));
//
// ChukwaWriter parquetWriter = new ChukwaParquetWriter(confParquetWriter);
//
// // we do not want our test to fail due to a lag in calling the
// // scheduleNextRotation() method and creating of first .chukwa file.
// // So, we will make sure that the rotation starts in the middle (approx)
// // of the rotateInterval
// long currentTime = System.currentTimeMillis();
// long currentTimeInSec = currentTime/1000;
// long timeAfterPrevRotateInterval = currentTimeInSec % rotateInterval;
// if(timeAfterPrevRotateInterval > (rotateInterval - 2)){
// Thread.sleep(2000);
// }
//
// String [] fileNames = directory.list();
// String firstFileName = "";
// String initialTimestamp = "";
// // extracting the close time of first .chukwa file. This timestamp can be
// // extracted from the file name. An example filename is
// // 20110531122600002_<host-name>_5f836ece1302899d9a0727e.chukwa
// for(String file: fileNames) {
// if ( file.endsWith(".chukwa") ){
// // set a flag so that later we can identify that this file has been
// // visited
// firstFileName = file;
// // getting just the timestamp part i.e. 20110531122600002 in the
// // example filename mentioned in the above comment
// initialTimestamp = file.split("_")[0];
// // stripping off the millisecond part of timestamp. The timestamp
// // now becomes 20110531122600
// initialTimestamp = initialTimestamp.substring(0, initialTimestamp.length()-3);
// break;
// }
// }
//
// SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddhhmmss");
// Date initialDate = formatter.parse(initialTimestamp);
// long initialDateInMillis = initialDate.getTime();
//
// // calculate the expected close time of the next .chukwa file.
// long prevRoundedInterval = initialDateInMillis - (initialDateInMillis %
// rotateInterval);
// long expectedNextCloseDate = prevRoundedInterval +
// rotateInterval + intervalOffset;
//
// // sleep for a time interval equal to (rotateInterval + offsetInterval).
// // Only one more .chukwa file will be will be produced in this time
// // interval.
// long sleepTime = rotateInterval + intervalOffset;
//
// Thread.sleep(sleepTime);
// fileNames = directory.list();
// String nextTimestamp = "";
// // extract the timestamp of the second .chukwa file
// for(String file: fileNames) {
// if ( file.endsWith(".chukwa") && !file.equals(firstFileName)){
// nextTimestamp = file.split("_")[0];
// nextTimestamp = nextTimestamp.substring(0, nextTimestamp.length()-3);
// break;
// }
// }
//
// Date nextDate = formatter.parse(nextTimestamp);
// System.out.println("initialTimestamp:"+nextTimestamp);
// long nextDateInMillis = nextDate.getTime();
//
// long threshold = 500; //milliseconds
//
// // test will be successful only if the timestamp on the second .chukwa
// // file is very close (differs by < 500 ms) to the expected closing
// // timestamp we calculated.
// Assert.assertTrue("File not closed at expected time",
// (nextDateInMillis - expectedNextCloseDate < threshold));
// parquetWriter.close();
//
// } catch (Throwable e) {
// e.printStackTrace();
// Assert.fail("Exception in TestChukwaWriters - " +
// "testParquetFileFixedCloseInterval()," + e.getMessage());
// }
//}
/**
* Test to check the calculation of the delay interval for rotation in
* ParquetFileWriter. It uses an array of known currentTimestamps and their
* corresponding expectedRotateTimestamps (the next timestamp when the
* rotation should happen). The actual timestamp of next rotation is
* calculated by adding delay (obtained from getDelayForFixedInterval()) to
* the currentTimestamp.
*/
public void testFixedIntervalOffsetCalculation(){
try {
String tmpDir = System.getProperty("test.build.data", "/tmp");
long ts = System.currentTimeMillis();
String dataDir = tmpDir + "/TestChukwaWriters_" + ts;
Configuration conf = new Configuration();
conf.set("chukwaCollector.outputDir", dataDir + "/log/");
ChukwaParquetWriter parquetWriter = new ChukwaParquetWriter(conf);
SimpleDateFormat formatter = new SimpleDateFormat("yyyy/MM/dd hh:mm:ssZ");
//rotateInterval >> offsetInterval
long rotateInterval = 300000; //5 min
long offsetInterval = 60000; //1 min
long currentTimestamps[] = new long[5] ;
long expectedRotateTimestamps[] = new long[5];
Date date = formatter.parse("2011/06/15 01:05:00+0000");
currentTimestamps[0] = date.getTime();
expectedRotateTimestamps[0] = 1308100260000L; //2011/06/15 01:11:00
date = formatter.parse("2011/06/15 01:06:00+0000");
currentTimestamps[1] = date.getTime();
expectedRotateTimestamps[1] = 1308100260000L; //2011/06/15 01:11:00
date = formatter.parse("2011/06/15 01:02:00+0000");
currentTimestamps[2] = date.getTime();
expectedRotateTimestamps[2] = 1308099960000L; //2011/06/15 01:06:00
date = formatter.parse("2011/06/15 01:04:00+0000");
currentTimestamps[3] = date.getTime();
expectedRotateTimestamps[3] = 1308099960000L; //2011/06/15 01:06:00
//edge case, when there is a change in the "hour"
date = formatter.parse("2011/06/15 01:56:00+0000");
currentTimestamps[4] = date.getTime();
expectedRotateTimestamps[4] = 1308103260000L; //2011/06/15 02:01:00
int i=0;
long expectedDelay = 0;
long actualRotateTimestamp = 0;
for(; i<5; i++){
expectedDelay = parquetWriter.getDelayForFixedInterval(
currentTimestamps[i], rotateInterval, offsetInterval);
actualRotateTimestamp = currentTimestamps[i] + expectedDelay;
Assert.assertTrue("Incorrect value for delay",
(actualRotateTimestamp==expectedRotateTimestamps[i]));
}
//rotateInterval > offsetInterval
rotateInterval = 60000; //1 min
offsetInterval = 30000; //30 sec
date = formatter.parse("2011/06/15 01:05:00+0000");
currentTimestamps[0] = date.getTime();
expectedRotateTimestamps[0] = 1308099990000L; //2011/06/15 01:06:30
date = formatter.parse("2011/06/15 01:04:30+0000");
currentTimestamps[1] = date.getTime();
expectedRotateTimestamps[1] = 1308099930000L; //2011/06/15 01:05:30
date = formatter.parse("2011/06/15 01:05:30+0000");
currentTimestamps[2] = date.getTime();
expectedRotateTimestamps[2] = 1308099990000L; //2011/06/15 01:06:30
date = formatter.parse("2011/06/15 01:04:00+0000");
currentTimestamps[3] = date.getTime();
expectedRotateTimestamps[3] = 1308099930000L; //2011/06/15 01:05:30
//edge case, when there is a change in the "hour"
date = formatter.parse("2011/06/15 01:59:30+0000");
currentTimestamps[4] = date.getTime();
expectedRotateTimestamps[4] = 1308103230000L; //2011/06/15 02:00:30
for(i=0; i<5; i++){
expectedDelay = parquetWriter.getDelayForFixedInterval(
currentTimestamps[i], rotateInterval, offsetInterval);
actualRotateTimestamp = currentTimestamps[i] + expectedDelay;
Assert.assertTrue("Incorrect value for delay",
(actualRotateTimestamp==expectedRotateTimestamps[i]));
}
//rotateInterval = offsetInterval
rotateInterval = 60000; //1 min
offsetInterval = 60000; //1 min
date = formatter.parse("2011/06/15 01:02:00+0000");
currentTimestamps[0] = date.getTime();
expectedRotateTimestamps[0] = 1308099840000L; //2011/06/15 01:04:00
date = formatter.parse("2011/06/15 01:02:30+0000");
currentTimestamps[1] = date.getTime();
expectedRotateTimestamps[1] = 1308099840000L; //2011/06/15 01:04:00
//edge case, when there is a change in the "hour"
date = formatter.parse("2011/06/15 01:59:30+0000");
currentTimestamps[2] = date.getTime();
expectedRotateTimestamps[2] = 1308103260000L; //2011/06/15 02:01:00
for(i=0; i<3; i++){
expectedDelay = parquetWriter.getDelayForFixedInterval(
currentTimestamps[i], rotateInterval, offsetInterval);
actualRotateTimestamp = currentTimestamps[i] + expectedDelay;
Assert.assertTrue("Incorrect value for delay",
(actualRotateTimestamp==expectedRotateTimestamps[i]));
}
//rotateInterval < offsetInterval
rotateInterval = 60000; //1 min
offsetInterval = 120000; //2 min
date = formatter.parse("2011/06/15 01:02:00+0000");
currentTimestamps[0] = date.getTime();
expectedRotateTimestamps[0] = 1308099900000L; //2011/06/15 01:05:00
date = formatter.parse("2011/06/15 01:02:30+0000");
currentTimestamps[1] = date.getTime();
expectedRotateTimestamps[1] = 1308099900000L; //2011/06/15 01:05:00
//edge case, when there is a change in the "hour"
date = formatter.parse("2011/06/15 01:59:30+0000");
currentTimestamps[2] = date.getTime();
expectedRotateTimestamps[2] = 1308103320000L; //2011/06/15 02:02:00
for(i=0; i<3; i++){
expectedDelay = parquetWriter.getDelayForFixedInterval(
currentTimestamps[i], rotateInterval, offsetInterval);
actualRotateTimestamp = currentTimestamps[i] + expectedDelay;
Assert.assertTrue("Incorrect value for delay",
(actualRotateTimestamp==expectedRotateTimestamps[i]));
}
} catch (Throwable e) {
e.printStackTrace();
Assert.fail("Exception in TestChukwaWriters - " +
"testFixedIntervalOffsetCalculation()," + e.getMessage());
}
}
}