blob: 84f350407b74b27e918ff8814b3dcac1f71e73db [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.chukwa.tools.backfilling;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.chukwa.datacollection.writer.parquet.ChukwaAvroSchema;
import org.apache.hadoop.chukwa.validationframework.util.MD5;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroReadSupport;
public class TestBackfillingLoader extends TestCase{
private String cluster = "chukwa";
public void testBackfillingLoaderWithCharFileTailingAdaptorUTF8NewLineEscaped() {
String tmpDir = System.getProperty("test.build.data", "/tmp");
long ts = System.currentTimeMillis();
String dataDir = tmpDir + "/TestBackfillingLoader_" + ts;
Configuration conf = new Configuration();
conf.set("writer.hdfs.filesystem", "file:///");
conf.set("chukwaCollector.outputDir", dataDir + "/log/");
conf.set("chukwaCollector.rotateInterval", "" + (Integer.MAX_VALUE -1));
String cluster = "chukwa";
String machine = "machine_" + ts;
String adaptorName = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped";
String recordType = "MyRecordType_" + ts;
try {
FileSystem fs = FileSystem.getLocal(conf);
File in1Dir = new File(dataDir + "/input");
in1Dir.mkdirs();
int lineCount = 107;
File inputFile = makeTestFile(dataDir + "/input/in1.txt",lineCount);
long size = inputFile.length();
String logFile = inputFile.getAbsolutePath();
System.out.println("Output:" + logFile);
System.out.println("File:" + inputFile.length());
BackfillingLoader loader = new BackfillingLoader(conf,cluster,machine,adaptorName,recordType,logFile);
loader.process();
File finalOutputFile = new File(dataDir + "/input/in1.txt.sav");
Assert.assertTrue(inputFile.exists() == false);
Assert.assertTrue(finalOutputFile.exists() == true);
String doneFile = null;
File directory = new File(dataDir + "/log/");
String[] files = directory.list();
for(String file: files) {
if ( file.endsWith(".done") ){
doneFile = dataDir + "/log/" + file;
break;
}
}
long seqId = validateDataSink(fs,conf,doneFile,finalOutputFile,
cluster, recordType, machine, logFile);
Assert.assertTrue(seqId == size);
} catch (Throwable e) {
e.printStackTrace();
Assert.fail();
}
try {
FileUtils.deleteDirectory(new File(dataDir));
} catch (IOException e) {
e.printStackTrace();
}
}
public void testBackfillingLoaderWithFileAdaptor() {
String tmpDir = System.getProperty("test.build.data", "/tmp");
long ts = System.currentTimeMillis();
String dataDir = tmpDir + "/TestBackfillingLoader_" + ts;
Configuration conf = new Configuration();
conf.set("writer.hdfs.filesystem", "file:///");
conf.set("chukwaCollector.outputDir", dataDir + "/log/");
conf.set("chukwaCollector.rotateInterval", "" + (Integer.MAX_VALUE -1));
String machine = "machine_" + ts;
String adaptorName = "org.apache.hadoop.chukwa.datacollection.adaptor.FileAdaptor";
String recordType = "MyRecordType_" + ts;
try {
FileSystem fs = FileSystem.getLocal(conf);
File in1Dir = new File(dataDir + "/input");
in1Dir.mkdirs();
int lineCount = 118;
File inputFile = makeTestFile(dataDir + "/input/in2.txt",lineCount);
long size = inputFile.length();
String logFile = inputFile.getAbsolutePath();
System.out.println("Output:" + logFile);
System.out.println("File:" + inputFile.length());
BackfillingLoader loader = new BackfillingLoader(conf,cluster,machine,adaptorName,recordType,logFile);
loader.process();
File finalOutputFile = new File(dataDir + "/input/in2.txt.sav");
Assert.assertTrue("Input file exists", inputFile.exists() == false);
Assert.assertTrue("Final input file exists", finalOutputFile.exists() == true);
String doneFile = null;
File directory = new File(dataDir + "/log/");
String[] files = directory.list();
for(String file: files) {
if ( file.endsWith(".done") ){
doneFile = dataDir + "/log/" + file;
break;
}
}
long seqId = validateDataSink(fs,conf,doneFile,finalOutputFile,
cluster, recordType, machine, logFile);
Assert.assertTrue(seqId == size);
} catch (Throwable e) {
e.printStackTrace();
Assert.fail();
}
try {
FileUtils.deleteDirectory(new File(dataDir));
} catch (IOException e) {
e.printStackTrace();
}
}
public void testBackfillingLoaderWithCharFileTailingAdaptorUTF8NewLineEscapedBigFile() {
String tmpDir = System.getProperty("test.build.data", "/tmp");
long ts = System.currentTimeMillis();
String dataDir = tmpDir + "/TestBackfillingLoader_" + ts;
Configuration conf = new Configuration();
conf.set("writer.hdfs.filesystem", "file:///");
conf.set("chukwaCollector.outputDir", dataDir + "/log/");
conf.set("chukwaCollector.rotateInterval", "" + (Integer.MAX_VALUE -1));
String machine = "machine_" + ts;
String adaptorName = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped";
String recordType = "MyRecordType_" + ts;
try {
FileSystem fs = FileSystem.getLocal(conf);
File in1Dir = new File(dataDir + "/input");
in1Dir.mkdirs();
int lineCount = 1024*1024;//34MB
File inputFile = makeTestFile(dataDir + "/input/in1.txt",lineCount);
long size = inputFile.length();
String logFile = inputFile.getAbsolutePath();
System.out.println("Output:" + logFile);
System.out.println("File:" + inputFile.length());
BackfillingLoader loader = new BackfillingLoader(conf,cluster,machine,adaptorName,recordType,logFile);
loader.process();
File finalOutputFile = new File(dataDir + "/input/in1.txt.sav");
Assert.assertTrue(inputFile.exists() == false);
Assert.assertTrue(finalOutputFile.exists() == true);
String doneFile = null;
File directory = new File(dataDir + "/log/");
String[] files = directory.list();
for(String file: files) {
if ( file.endsWith(".done") ){
doneFile = dataDir + "/log/" + file;
break;
}
}
long seqId = validateDataSink(fs,conf,doneFile,finalOutputFile,
cluster, recordType, machine, logFile);
Assert.assertTrue(seqId == size);
} catch (Throwable e) {
e.printStackTrace();
Assert.fail();
}
try {
FileUtils.deleteDirectory(new File(dataDir));
} catch (IOException e) {
e.printStackTrace();
}
}
public void testBackfillingLoaderWithCharFileTailingAdaptorUTF8NewLineEscapedBigFileLocalWriter() {
String tmpDir = System.getProperty("test.build.data", "/tmp");
long ts = System.currentTimeMillis();
String dataDir = tmpDir + "/TestBackfillingLoader_" + ts;
Configuration conf = new Configuration();
conf.set("writer.hdfs.filesystem", "file:///");
conf.set("chukwaCollector.outputDir", dataDir + "/log/");
conf.set("chukwaCollector.rotateInterval", "" + (Integer.MAX_VALUE -1));
conf.set("chukwaCollector.localOutputDir", dataDir + "/log/");
conf.set("chukwaCollector.writerClass", "org.apache.hadoop.chukwa.datacollection.writer.localfs.LocalWriter");
conf.set("chukwaCollector.minPercentFreeDisk", "2");//so unit tests pass on machines with full-ish disks
String machine = "machine_" + ts;
String adaptorName = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped";
String recordType = "MyRecordType_" + ts;
try {
FileSystem fs = FileSystem.getLocal(conf);
File in1Dir = new File(dataDir + "/input");
in1Dir.mkdirs();
int lineCount = 1024*1024*2;//64MB
File inputFile = makeTestFile(dataDir + "/input/in1.txt",lineCount);
long size = inputFile.length();
String logFile = inputFile.getAbsolutePath();
System.out.println("Output:" + logFile);
System.out.println("File:" + inputFile.length());
BackfillingLoader loader = new BackfillingLoader(conf,cluster,machine,adaptorName,recordType,logFile);
loader.process();
File finalOutputFile = new File(dataDir + "/input/in1.txt.sav");
Assert.assertTrue(inputFile.exists() == false);
Assert.assertTrue(finalOutputFile.exists() == true);
String doneFile = null;
File directory = new File(dataDir + "/log/");
String[] files = directory.list();
for(String file: files) {
if ( file.endsWith(".done") ){
doneFile = dataDir + "/log/" + file;
break;
}
}
long seqId = validateDataSink(fs,conf,doneFile,finalOutputFile,
cluster, recordType, machine, logFile);
Assert.assertTrue(seqId == size);
} catch (Throwable e) {
e.printStackTrace();
Assert.fail();
}
try {
FileUtils.deleteDirectory(new File(dataDir));
} catch (IOException e) {
e.printStackTrace();
}
}
protected long validateDataSink(FileSystem fs,Configuration conf, String dataSinkFile, File logFile,
String cluster,String dataType, String source, String application) throws Throwable {
AvroParquetReader<GenericRecord> reader = null;
long lastSeqId = -1;
FileOutputStream out = null;
try {
Schema chukwaAvroSchema = ChukwaAvroSchema.getSchema();
AvroReadSupport.setRequestedProjection(conf, chukwaAvroSchema);
reader = new AvroParquetReader<GenericRecord>(conf, new Path(dataSinkFile));
String dataSinkDumpName = dataSinkFile + ".dump";
out = new FileOutputStream(new File(dataSinkDumpName), true);
GenericRecord record = null;
while ( true ) {
record = reader.read();
if(record == null)
break;
Assert.assertTrue(record.get("tags").toString().contains(cluster));
Assert.assertTrue(dataType.equals(record.get("dataType")));
Assert.assertTrue(source.equals(record.get("source")));
byte[] data = ((ByteBuffer)record.get("data")).array();
out.write(data);
lastSeqId = ((Long)record.get("seqId")).longValue();
}
out.close();
out = null;
reader.close();
reader = null;
String dataSinkMD5 = MD5.checksum(new File(dataSinkDumpName));
String logFileMD5 = MD5.checksum(logFile);
Assert.assertTrue(dataSinkMD5.equals(logFileMD5));
}
finally {
if (out != null) {
out.close();
}
if (reader != null) {
reader.close();
}
}
return lastSeqId;
}
private File makeTestFile(final String name, int size) throws IOException {
File tmpOutput = new File(name);
FileOutputStream fos = new FileOutputStream(tmpOutput);
PrintWriter pw = new PrintWriter(fos);
for (int i = 0; i < size; ++i) {
pw.print(i + " ");
pw.println("abcdefghijklmnopqrstuvwxyz");
}
pw.flush();
pw.close();
fos.close();
return tmpOutput;
}
}