blob: 4a1c770d7b8d28ed26b4e4609ec51926d7c3f9e6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.contrib.formatter;
import java.io.IOException;
import javax.validation.ConstraintViolationException;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.contrib.parser.CsvPOJOParserTest.Ad;
import com.datatorrent.lib.appdata.schemas.SchemaUtils;
import com.datatorrent.lib.io.ConsoleOutputOperator;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.lib.util.TestUtils;
public class CsvFormatterTest
{
private static final String filename = "schema.json";
CsvFormatter operator;
CollectorTestSink<Object> validDataSink;
CollectorTestSink<String> invalidDataSink;
@Rule
public Watcher watcher = new Watcher();
public class Watcher extends TestWatcher
{
@Override
protected void starting(Description description)
{
super.starting(description);
operator = new CsvFormatter();
operator.setClazz(Ad.class);
operator.setSchema(SchemaUtils.jarResourceFileToString(filename));
validDataSink = new CollectorTestSink<Object>();
invalidDataSink = new CollectorTestSink<String>();
TestUtils.setSink(operator.out, validDataSink);
TestUtils.setSink(operator.err, invalidDataSink);
}
@Override
protected void finished(Description description)
{
super.finished(description);
operator.teardown();
}
}
@Test
public void testPojoReaderToCsv()
{
operator.setup(null);
Ad ad = new Ad();
ad.setCampaignId(9823);
ad.setAdId(1234);
ad.setAdName("ad");
ad.setBidPrice(1.2);
ad.setStartDate(
new DateTime().withDate(2015, 1, 1).withHourOfDay(0).withMinuteOfHour(0).withSecondOfMinute(0).toDate());
ad.setEndDate(new DateTime().withDate(2016, 1, 1).toDate());
ad.setSecurityCode(12345678);
ad.setParentCampaign("CAMP_AD");
ad.setActive(true);
ad.setWeatherTargeted('y');
ad.setValid("valid");
operator.in.process(ad);
Assert.assertEquals(1, validDataSink.collectedTuples.size());
Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
String csvOp = (String)validDataSink.collectedTuples.get(0);
Assert.assertNotNull(csvOp);
Assert.assertEquals("1234,9823,ad,1.2,2015-01-01 00:00:00,01/01/2016,12345678,true,false,CAMP_AD,y,valid\r\n",
csvOp);
Assert.assertEquals(1, operator.getIncomingTuplesCount());
Assert.assertEquals(0, operator.getErrorTupleCount());
Assert.assertEquals(1, operator.getEmittedObjectCount());
}
@Test
public void testPojoReaderToCsvNullInput()
{
operator.setup(null);
operator.in.process(null);
Assert.assertEquals(0, validDataSink.collectedTuples.size());
Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
Assert.assertEquals(1, operator.getIncomingTuplesCount());
Assert.assertEquals(1, operator.getErrorTupleCount());
Assert.assertEquals(0, operator.getEmittedObjectCount());
}
@Test
public void testApplication() throws IOException, Exception
{
try {
LocalMode lma = LocalMode.newInstance();
Configuration conf = new Configuration(false);
lma.prepareDAG(new CsvParserApplication(), conf);
LocalMode.Controller lc = lma.getController();
lc.run(5000);// runs for 5 seconds and quits
} catch (ConstraintViolationException e) {
Assert.fail("constraint violations: " + e.getConstraintViolations());
}
}
public static class CsvParserApplication implements StreamingApplication
{
@Override
public void populateDAG(DAG dag, Configuration conf)
{
PojoEmitter input = dag.addOperator("data", new PojoEmitter());
CsvFormatter formatter = dag.addOperator("formatter", new CsvFormatter());
dag.getMeta(formatter).getMeta(formatter.in).getAttributes().put(Context.PortContext.TUPLE_CLASS, Ad.class);
formatter.setSchema(SchemaUtils.jarResourceFileToString("schema.json"));
ConsoleOutputOperator output = dag.addOperator("output", new ConsoleOutputOperator());
ConsoleOutputOperator error = dag.addOperator("error", new ConsoleOutputOperator());
output.setDebug(true);
dag.addStream("input", input.output, formatter.in);
dag.addStream("output", formatter.out, output.input);
dag.addStream("err", formatter.err, error.input);
}
}
public static class PojoEmitter extends BaseOperator implements InputOperator
{
public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>();
@Override
public void emitTuples()
{
Ad ad = new Ad();
ad.setCampaignId(9823);
ad.setAdId(1234);
ad.setAdName("ad");
ad.setBidPrice(1.2);
ad.setStartDate(
new DateTime().withDate(2015, 1, 1).withHourOfDay(0).withMinuteOfHour(0).withSecondOfMinute(0).toDate());
ad.setEndDate(new DateTime().withDate(2016, 1, 1).toDate());
ad.setSecurityCode(12345678);
ad.setParentCampaign("CAMP_AD");
ad.setActive(true);
ad.setWeatherTargeted('y');
ad.setValid("valid");
output.emit(ad);
}
}
}