blob: e2ac96505f316a53a71a4b2ed44f44b9dda47d49 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.chukwa.util;
import junit.framework.TestCase;
import java.text.SimpleDateFormat;
import java.text.ParseException;
import java.net.InetAddress;
import java.io.File;
import java.io.IOException;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Calendar;
import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.TsProcessor;
import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessor;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
public class TestCreateRecordFile extends TestCase {
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
private Calendar calendar = Calendar.getInstance();
public void testWriteSequenceFile() throws IOException, ParseException {
String outputDir = System.getProperty("test.build.data", "/tmp");
//input configs
String datadir = System.getenv("CHUKWA_DATA_DIR");
if(datadir == null)
datadir = "test/samples";
else
datadir = datadir + File.separator + "log";
File inputFile = new File( datadir+ File.separator + "ClientTrace.log");
Path outputFile = new Path(outputDir + "/" + this.getClass().getName() + "/ClientTrace.evt");
String clusterName = "testClusterName";
String dataType = "testDataType";
String streamName = "testStreamName";
MapProcessor processor = new TsProcessor();
//create the sequence file
CreateRecordFile.makeTestSequenceFile(inputFile, outputFile, clusterName,
dataType, streamName, processor);
//read the output file
ChukwaRecordKey key = new ChukwaRecordKey();
ChukwaRecord record = new ChukwaRecord();
Configuration conf = new Configuration();
FileSystem fs = outputFile.getFileSystem(conf);
SequenceFile.Reader sequenceReader = new SequenceFile.Reader(fs, outputFile, conf);
//read the input file to assert
BufferedReader inputReader = new BufferedReader(new FileReader(inputFile));
String expectedHostname = InetAddress.getLocalHost().getHostName();
//Read input and output back comparing each
int i = 0;
while (sequenceReader.next(key, record)) {
String line = inputReader.readLine();
assertNotNull("Sequence file contains more records than input file", line);
long expectedTime = sdf.parse(line.substring(0,23)).getTime();
calendar.setTimeInMillis(expectedTime);
calendar.set(Calendar.MINUTE, 0);
calendar.set(Calendar.SECOND, 0);
calendar.set(Calendar.MILLISECOND, 0);
String expectedKey = calendar.getTimeInMillis() + "/" +
expectedHostname + "/" + expectedTime;
String expectedTags = "cluster=\"" + clusterName + "\"";
//assert key
assertEquals("Invalid key found for record " + i, expectedKey, key.getKey());
assertEquals("Invalid dataType found for record " + i, dataType, key.getReduceType());
//assert record
assertEquals("Invalid record time for record " + i, expectedTime, record.getTime());
assertEquals("Invalid body for record " + i, line, record.getValue("body"));
assertEquals("Invalid capp for record " + i, streamName, record.getValue("capp"));
assertEquals("Invalid csource for record " + i, expectedHostname, record.getValue("csource"));
assertEquals("Invalid ctags for record " + i, expectedTags , record.getValue("ctags").trim());
i++;
}
sequenceReader.close();
inputReader.close();
}
}