blob: 17b1e2ceb01d557b6cabc5fabfea02a443e826ab [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.contrib.avro;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import javax.validation.ConstraintViolationException;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.python.google.common.collect.Lists;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import com.google.common.collect.Sets;
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.Sink;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.lib.helper.TestPortContext;
import com.datatorrent.lib.io.ConsoleOutputOperator;
import com.datatorrent.lib.io.fs.AbstractFileInputOperatorTest;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.lib.util.TestUtils;
import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext;
/**
* <p>
* In this class the emitTuples method is called twice to process the first
* input, since on begin window 0 the operator is setup & stream is initialized.
* The platform calls the emitTuples method in the successive windows
* </p>
*/
public class AvroFileInputOperatorTest
{
private static final String AVRO_SCHEMA = "{\"namespace\":\"abc\"," + ""
+ "\"type\":\"record\",\"doc\":\"Order schema\"," + "\"name\":\"Order\",\"fields\":[{\"name\":\"orderId\","
+ "\"type\": \"long\"}," + "{\"name\":\"customerId\",\"type\": \"int\"},"
+ "{\"name\":\"total\",\"type\": \"double\"}," + "{\"name\":\"customerName\",\"type\": \"string\"}]}";
private static final String FILENAME = "/tmp/simpleorder.avro";
private static final String OTHER_FILE = "/tmp/simpleorder2.avro";
private static final String ERROR_FILE = "/tmp/errorFile.avro";
CollectorTestSink<Object> output = new CollectorTestSink<Object>();
CollectorTestSink<Object> completedFilesPort = new CollectorTestSink<Object>();
CollectorTestSink<Object> errorRecordsPort = new CollectorTestSink<Object>();
AvroFileInputOperator avroFileInput = new AvroFileInputOperator();
private List<GenericRecord> recordList = null;
public static class TestMeta extends TestWatcher
{
public String dir = null;
OperatorContext context;
PortContext portContext;
@Override
protected void starting(org.junit.runner.Description description)
{
String methodName = description.getMethodName();
String className = description.getClassName();
this.dir = "target/" + className + "/" + methodName;
Attribute.AttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
attributes.put(Context.DAGContext.APPLICATION_PATH, dir);
context = mockOperatorContext(1, attributes);
Attribute.AttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
portAttributes.put(Context.PortContext.TUPLE_CLASS, SimpleOrder.class);
portContext = new TestPortContext(portAttributes);
}
@Override
protected void finished(Description description)
{
try {
FileUtils.deleteDirectory(new File("target/" + description.getClassName()));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
@Rule
public TestMeta testMeta = new TestMeta();
@Test
public void testSingleFileAvroReads() throws Exception
{
FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);
int cnt = 7;
createAvroInput(cnt);
writeAvroFile(new File(FILENAME));
avroFileInput.output.setSink(output);
avroFileInput.completedFilesPort.setSink(completedFilesPort);
avroFileInput.errorRecordsPort.setSink(errorRecordsPort);
avroFileInput.setDirectory(testMeta.dir);
avroFileInput.setup(testMeta.context);
avroFileInput.beginWindow(0);
avroFileInput.emitTuples();
avroFileInput.emitTuples();
Assert.assertEquals("Record count", cnt, avroFileInput.recordCount);
avroFileInput.endWindow();
Assert.assertEquals("number tuples", cnt, output.collectedTuples.size());
Assert.assertEquals("Error tuples", 0, errorRecordsPort.collectedTuples.size());
Assert.assertEquals("Completed File", 1, completedFilesPort.collectedTuples.size());
avroFileInput.teardown();
}
@Test
public void testIdempotencyWithCheckPoint() throws Exception
{
AbstractFileInputOperatorTest.testIdempotencyWithCheckPoint(new AvroFileInputOperator(), new CollectorTestSink<String>(), new AbstractFileInputOperatorTest.IdempotencyTestDriver<AvroFileInputOperator>()
{
@Override
public void writeFile(int count, String fileName) throws IOException
{
recordList = Lists.newArrayList();
while (count > 0) {
GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(AVRO_SCHEMA));
rec.put("orderId", count * 1L);
rec.put("customerId", count * 2);
rec.put("total", count * 1.5);
rec.put("customerName", "*" + count + "*");
count--;
recordList.add(rec);
}
writeAvroFile(new File(fileName));
}
@Override
public void setSink(AvroFileInputOperator operator, Sink<?> sink)
{
TestUtils.setSink(operator.output, sink);
}
@Override
public String getDirectory()
{
return testMeta.dir;
}
@Override
public OperatorContext getContext()
{
return testMeta.context;
}
});
}
@Test
public void testMultipleFileAvroReads() throws Exception
{
FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);
int cnt = 7;
createAvroInput(cnt);
writeAvroFile(new File(FILENAME));
writeAvroFile(new File(OTHER_FILE));
avroFileInput.output.setSink(output);
avroFileInput.completedFilesPort.setSink(completedFilesPort);
avroFileInput.errorRecordsPort.setSink(errorRecordsPort);
avroFileInput.setDirectory(testMeta.dir);
avroFileInput.setup(testMeta.context);
avroFileInput.beginWindow(0);
avroFileInput.emitTuples();
avroFileInput.beginWindow(1);
avroFileInput.emitTuples();
Assert.assertEquals("number tuples after window 0", cnt, output.collectedTuples.size());
avroFileInput.emitTuples();
avroFileInput.endWindow();
Assert.assertEquals("Error tuples", 0, errorRecordsPort.collectedTuples.size());
Assert.assertEquals("number tuples after window 1", 2 * cnt, output.collectedTuples.size());
Assert.assertEquals("Completed File", 2, completedFilesPort.collectedTuples.size());
avroFileInput.teardown();
}
@Test
public void testInvalidFormatFailure() throws Exception
{
FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);
int cnt = 7;
writeErrorFile(cnt, new File(ERROR_FILE));
avroFileInput.output.setSink(output);
avroFileInput.setDirectory(testMeta.dir);
avroFileInput.setup(testMeta.context);
avroFileInput.beginWindow(0);
avroFileInput.emitTuples();
avroFileInput.emitTuples();
avroFileInput.endWindow();
Assert.assertEquals("number tuples after window 1", 0, output.collectedTuples.size());
avroFileInput.teardown();
}
private void createAvroInput(int cnt)
{
recordList = Lists.newArrayList();
while (cnt > 0) {
GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(AVRO_SCHEMA));
rec.put("orderId", cnt * 1L);
rec.put("customerId", cnt * 2);
rec.put("total", cnt * 1.5);
rec.put("customerName", "*" + cnt + "*");
cnt--;
recordList.add(rec);
}
}
private void writeErrorFile(int cnt, File errorFile) throws IOException
{
List<String> allLines = Lists.newArrayList();
HashSet<String> lines = Sets.newHashSet();
for (int line = 0; line < 5; line++) {
lines.add("f0" + "l" + line);
}
allLines.addAll(lines);
FileUtils.write(errorFile, StringUtils.join(lines, '\n'));
FileUtils.moveFileToDirectory(new File(errorFile.getAbsolutePath()), new File(testMeta.dir), true);
}
private void writeAvroFile(File outputFile) throws IOException
{
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(
new Schema.Parser().parse(AVRO_SCHEMA));
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
dataFileWriter.create(new Schema.Parser().parse(AVRO_SCHEMA), outputFile);
for (GenericRecord record : recordList) {
dataFileWriter.append(record);
}
dataFileWriter.close();
FileUtils.moveFileToDirectory(new File(outputFile.getAbsolutePath()), new File(testMeta.dir), true);
}
@Test
public void testApplication() throws IOException, Exception
{
try {
FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);
int cnt = 7;
createAvroInput(cnt);
writeAvroFile(new File(FILENAME));
createAvroInput(cnt - 2);
writeAvroFile(new File(OTHER_FILE));
avroFileInput.setDirectory(testMeta.dir);
LocalMode lma = LocalMode.newInstance();
Configuration conf = new Configuration(false);
AvroReaderApplication avroReaderApplication = new AvroReaderApplication();
avroReaderApplication.setAvroFileInputOperator(avroFileInput);
lma.prepareDAG(avroReaderApplication, conf);
LocalMode.Controller lc = lma.getController();
lc.run(10000);// runs for 10 seconds and quits
} catch (ConstraintViolationException e) {
Assert.fail("constraint violations: " + e.getConstraintViolations());
}
}
@Test
public void testApplicationWithPojoConversion() throws IOException, Exception
{
try {
FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);
int cnt = 7;
createAvroInput(cnt);
writeAvroFile(new File(FILENAME));
createAvroInput(cnt - 2);
writeAvroFile(new File(OTHER_FILE));
avroFileInput.setDirectory(testMeta.dir);
AvroToPojo avroToPojo = new AvroToPojo();
avroToPojo.setPojoClass(SimpleOrder.class);
LocalMode lma = LocalMode.newInstance();
Configuration conf = new Configuration(false);
AvroToPojoApplication avroToPojoApplication = new AvroToPojoApplication();
avroToPojoApplication.setAvroFileInputOperator(avroFileInput);
avroToPojoApplication.setAvroToPojo(avroToPojo);
lma.prepareDAG(avroToPojoApplication, conf);
LocalMode.Controller lc = lma.getController();
lc.run(10000);// runs for 10 seconds and quits
} catch (ConstraintViolationException e) {
Assert.fail("constraint violations: " + e.getConstraintViolations());
}
}
public static class AvroReaderApplication implements StreamingApplication
{
AvroFileInputOperator avroFileInputOperator;
public AvroFileInputOperator getAvroFileInput()
{
return avroFileInputOperator;
}
public void setAvroFileInputOperator(AvroFileInputOperator avroFileInputOperator)
{
this.avroFileInputOperator = avroFileInputOperator;
}
@Override
public void populateDAG(DAG dag, Configuration conf)
{
AvroFileInputOperator avroInputOperator = dag.addOperator("avroInputOperator", getAvroFileInput());
ConsoleOutputOperator consoleOutput = dag.addOperator("GenericRecordOp", new ConsoleOutputOperator());
dag.addStream("pojo", avroInputOperator.output, consoleOutput.input).setLocality(Locality.CONTAINER_LOCAL);
}
}
public static class AvroToPojoApplication implements StreamingApplication
{
AvroFileInputOperator avroFileInputOperator;
AvroToPojo avroToPojo;
public AvroFileInputOperator getAvroFileInput()
{
return avroFileInputOperator;
}
public void setAvroFileInputOperator(AvroFileInputOperator avroFileInputOperator)
{
this.avroFileInputOperator = avroFileInputOperator;
}
public void setAvroToPojo(AvroToPojo avroToPojo)
{
this.avroToPojo = avroToPojo;
}
public AvroToPojo getAvroToPojo()
{
return avroToPojo;
}
@Override
public void populateDAG(DAG dag, Configuration conf)
{
AvroFileInputOperator avroInputOperator = dag.addOperator("avroInputOperator", getAvroFileInput());
AvroToPojo avroToPojo = dag.addOperator("AvroToPojo", getAvroToPojo());
ConsoleOutputOperator consoleOutput = dag.addOperator("GenericRecordOp", new ConsoleOutputOperator());
dag.getMeta(avroToPojo).getMeta(avroToPojo.output).getAttributes().put(Context.PortContext.TUPLE_CLASS,
SimpleOrder.class);
dag.addStream("GenericRecords", avroInputOperator.output, avroToPojo.data).setLocality(Locality.THREAD_LOCAL);
dag.addStream("POJO", avroToPojo.output, consoleOutput.input).setLocality(Locality.CONTAINER_LOCAL);
}
}
public static class SimpleOrder
{
private Integer customerId;
private Long orderId;
private Double total;
private String customerName;
public SimpleOrder()
{
}
public SimpleOrder(int customerId, long orderId, double total, String customerName)
{
setCustomerId(customerId);
setOrderId(orderId);
setTotal(total);
setCustomerName(customerName);
}
public String getCustomerName()
{
return customerName;
}
public void setCustomerName(String customerName)
{
this.customerName = customerName;
}
public Integer getCustomerId()
{
return customerId;
}
public void setCustomerId(Integer customerId)
{
this.customerId = customerId;
}
public Long getOrderId()
{
return orderId;
}
public void setOrderId(Long orderId)
{
this.orderId = orderId;
}
public Double getTotal()
{
return total;
}
public void setTotal(Double total)
{
this.total = total;
}
@Override
public String toString()
{
return "SimpleOrder [customerId=" + customerId + ", orderId=" + orderId + ", total=" + total + ", customerName="
+ customerName + "]";
}
}
}