blob: 312bd76d77809d4727c7ea7a33e702a2553cf411 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro.tool;
import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.avro.AvroTestUtil;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.junit.Test;
public class TestCatTool {
private static final int ROWS_IN_INPUT_FILES = 100000;
private static final int OFFSET = 1000;
private static final int LIMIT_WITHIN_INPUT_BOUNDS = 100;
private static final int LIMIT_OUT_OF_INPUT_BOUNDS = 100001;
private static final double SAMPLERATE = .01;
private static final double SAMPLERATE_TOO_SMALL = .00000001;
private final Schema INTSCHEMA = new Schema.Parser().parse(
"{\"type\":\"record\", " +
"\"name\":\"myRecord\", " +
"\"fields\":[ " +
"{\"name\":\"value\",\"type\":\"int\"} " +
"]}");
private final Schema STRINGSCHEMA = new Schema.Parser().parse(
"{\"type\":\"record\", " +
"\"name\":\"myRecord\", " +
"\"fields\":[ {\"name\":\"value\",\"type\":\"string\"} " +
"]}");
private static final CodecFactory DEFLATE = CodecFactory.deflateCodec(9);
private static final CodecFactory SNAPPY = CodecFactory.snappyCodec();
private GenericRecord aDatum(Type ofType, int forRow) {
GenericRecord record = null;
switch (ofType) {
case STRING:
record = new GenericData.Record(STRINGSCHEMA);
record.put("value", String.valueOf(forRow % 100));
return record;
case INT:
record = new GenericData.Record(INTSCHEMA);
record.put("value", forRow);
return record;
default:
throw new AssertionError("I can't generate data for this type");
}
}
private File generateData(String file, Type type, Map<String, String> metadata, CodecFactory codec) throws Exception {
File inputFile = AvroTestUtil.tempFile(getClass(), file);
inputFile.deleteOnExit();
Schema schema = null;
if(type.equals(Schema.Type.INT)) {
schema = INTSCHEMA;
}
if(type.equals(Schema.Type.STRING)) {
schema = STRINGSCHEMA;
}
DataFileWriter<Object> writer = new DataFileWriter<Object>(
new GenericDatumWriter<Object>(schema));
for(Entry<String, String> metadatum : metadata.entrySet()) {
writer.setMeta(metadatum.getKey(), metadatum.getValue());
}
writer.setCodec(codec);
writer.create(schema, inputFile);
for (int i = 0; i < ROWS_IN_INPUT_FILES; i++) {
writer.append(aDatum(type, i));
}
writer.close();
return inputFile;
}
private int getFirstIntDatum(File file) throws Exception {
DataFileStream<GenericRecord> reader = new DataFileStream<GenericRecord>( new FileInputStream(file) ,
new GenericDatumReader<GenericRecord>());
int result = (Integer) reader.next().get(0);
System.out.println(result);
reader.close();
return result;
}
private int numRowsInFile(File output) throws Exception {
DataFileStream<GenericRecord> reader = new DataFileStream<GenericRecord>(
new FileInputStream(output),
new GenericDatumReader<GenericRecord>());
Iterator<GenericRecord> rows = reader.iterator();
int rowcount = 0;
while(rows.hasNext()) {
++rowcount;
rows.next();
}
reader.close();
return rowcount;
}
@Test
public void testCat() throws Exception {
Map<String, String> metadata = new HashMap<String, String>();
metadata.put("myMetaKey", "myMetaValue");
File input1 = generateData("input1.avro", Type.INT, metadata, DEFLATE);
File input2 = generateData("input2.avro", Type.INT, metadata, SNAPPY);
File input3 = generateData("input3.avro", Type.INT, metadata, DEFLATE);
File output = AvroTestUtil.tempFile(getClass(), "out/default-output.avro");
output.deleteOnExit();
// file input
List<String> args = asList(
input1.getAbsolutePath(),
input2.getAbsolutePath(),
input3.getAbsolutePath(),
"--offset" , String.valueOf(OFFSET),
"--limit" , String.valueOf(LIMIT_WITHIN_INPUT_BOUNDS),
"--samplerate" , String.valueOf(SAMPLERATE),
output.getAbsolutePath());
int returnCode = new CatTool().run(
System.in,
System.out,
System.err,
args);
assertEquals(0, returnCode);
assertEquals(LIMIT_WITHIN_INPUT_BOUNDS, numRowsInFile(output));
// folder input
args = asList(
input1.getParentFile().getAbsolutePath(),
output.getAbsolutePath(),
"--offset" , String.valueOf(OFFSET),
"--limit" , String.valueOf(LIMIT_WITHIN_INPUT_BOUNDS));
returnCode = new CatTool().run(
System.in,
System.out,
System.err,
args);
assertEquals(0, returnCode);
assertEquals(LIMIT_WITHIN_INPUT_BOUNDS, numRowsInFile(output));
}
@Test
public void testLimitOutOfBounds() throws Exception {
Map<String, String> metadata = new HashMap<String, String>();
metadata.put("myMetaKey", "myMetaValue");
File input1 = generateData("input1.avro", Type.INT, metadata, DEFLATE);
File output = AvroTestUtil.tempFile(getClass(), "out/default-output.avro");
output.deleteOnExit();
List<String> args = asList(
input1.getAbsolutePath(),
"--offset=" + String.valueOf(OFFSET),
"--limit=" + String.valueOf(LIMIT_OUT_OF_INPUT_BOUNDS),
output.getAbsolutePath());
int returnCode = new CatTool().run(
System.in,
System.out,
System.err,
args);
assertEquals(0, returnCode);
assertEquals(ROWS_IN_INPUT_FILES - OFFSET, numRowsInFile(output));
}
@Test
public void testSamplerateAccuracy() throws Exception {
Map<String, String> metadata = new HashMap<String, String>();
metadata.put("myMetaKey", "myMetaValue");
File input1 = generateData("input1.avro", Type.INT, metadata, DEFLATE);
File output = AvroTestUtil.tempFile(getClass(), "out/default-output.avro");
output.deleteOnExit();
List<String>args = asList(
input1.getAbsolutePath(),
output.getAbsolutePath(),
"--offset" , String.valueOf(OFFSET),
"--samplerate" , String.valueOf(SAMPLERATE));
int returnCode = new CatTool().run(
System.in,
System.out,
System.err,
args);
assertEquals(0, returnCode);
assertTrue("Outputsize is not roughly (Inputsize - Offset) * samplerate",
(ROWS_IN_INPUT_FILES - OFFSET)*SAMPLERATE - numRowsInFile(output) < 2);
assertTrue("", (ROWS_IN_INPUT_FILES - OFFSET)*SAMPLERATE - numRowsInFile(output) > -2);
}
@Test
public void testOffSetAccuracy() throws Exception {
Map<String, String> metadata = new HashMap<String, String>();
metadata.put("myMetaKey", "myMetaValue");
File input1 = generateData("input1.avro", Type.INT, metadata, DEFLATE);
File output = AvroTestUtil.tempFile(getClass(), "out/default-output.avro");
output.deleteOnExit();
List<String> args = asList(
input1.getAbsolutePath(),
"--offset" , String.valueOf(OFFSET),
"--limit" , String.valueOf(LIMIT_WITHIN_INPUT_BOUNDS),
"--samplerate" , String.valueOf(SAMPLERATE),
output.getAbsolutePath());
int returnCode = new CatTool().run(
System.in,
System.out,
System.err,
args);
assertEquals(0, returnCode);
assertEquals("output does not start at offset",
OFFSET, getFirstIntDatum(output));
}
@Test
public void testOffsetBiggerThanInput() throws Exception{
Map<String, String> metadata = new HashMap<String, String>();
metadata.put("myMetaKey", "myMetaValue");
File input1 = generateData("input1.avro", Type.INT, metadata, DEFLATE);
File output = AvroTestUtil.tempFile(getClass(), "out/default-output.avro");
output.deleteOnExit();
List<String> args = asList(
input1.getAbsolutePath(),
"--offset" , String.valueOf(ROWS_IN_INPUT_FILES + 1),
output.getAbsolutePath());
int returnCode = new CatTool().run(
System.in,
System.out,
System.err,
args);
assertEquals(0, returnCode);
assertEquals("output is not empty",
0, numRowsInFile(output));
}
@Test
public void testSamplerateSmallerThanInput() throws Exception{
Map<String, String> metadata = new HashMap<String, String>();
metadata.put("myMetaKey", "myMetaValue");
File input1 = generateData("input1.avro", Type.INT, metadata, DEFLATE);
File output = AvroTestUtil.tempFile(getClass(), "out/default-output.avro");
output.deleteOnExit();
List<String> args = asList(
input1.getAbsolutePath(),
output.getAbsolutePath(),
"--offset=" + new Integer(OFFSET).toString(),
"--samplerate=" + new Double(SAMPLERATE_TOO_SMALL).toString());
int returnCode = new CatTool().run(
System.in,
System.out,
System.err,
args);
assertEquals(0, returnCode);
assertEquals("output should only contain the record at offset",
(int) OFFSET, getFirstIntDatum(output));
}
@Test(expected = IOException.class)
public void testDifferentSchemasFail() throws Exception {
Map<String, String> metadata = new HashMap<String, String>();
metadata.put("myMetaKey", "myMetaValue");
File input1 = generateData("input1.avro", Type.STRING, metadata, DEFLATE);
File input2 = generateData("input2.avro", Type.INT, metadata, DEFLATE);
File output = AvroTestUtil.tempFile(getClass(), "out/default-output.avro");
output.deleteOnExit();
List<String> args = asList(
input1.getAbsolutePath(),
input2.getAbsolutePath(),
output.getAbsolutePath());
new CatTool().run(
System.in,
System.out,
System.err,
args);
}
@Test
public void testHelpfulMessageWhenNoArgsGiven() throws Exception {
ByteArrayOutputStream buffer = new ByteArrayOutputStream(1024);
PrintStream out = new PrintStream(buffer);
int returnCode = new CatTool().run(
System.in,
out,
System.err,
Collections.<String>emptyList());
out.close(); // flushes too
assertEquals(0, returnCode);
assertTrue(
"should have lots of help",
buffer.toString().trim().length() > 200);
}
}