blob: ce9096abb465fad683409550b150e9e60b0bcc10 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.contrib.enrich;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.Arrays;
import java.util.Date;
import java.util.Map;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.apache.apex.malhar.lib.testbench.CollectorTestSink;
import org.apache.apex.malhar.lib.util.TestUtils;
import org.apache.commons.io.FileUtils;
import com.esotericsoftware.kryo.Kryo;
import com.google.common.collect.Maps;
public class FileEnrichmentTest
{
@Rule
public final TestUtils.TestInfo testInfo = new TestUtils.TestInfo();
@Test
public void testEnrichmentOperator() throws IOException, InterruptedException
{
URL origUrl = this.getClass().getResource("/productmapping.txt");
URL fileUrl = new URL(this.getClass().getResource("/").toString() + "productmapping1.txt");
FileUtils.deleteQuietly(new File(fileUrl.getPath()));
FileUtils.copyFile(new File(origUrl.getPath()), new File(fileUrl.getPath()));
MapEnricher oper = new MapEnricher();
FSLoader store = new JsonFSLoader();
store.setFileName(fileUrl.toString());
oper.setLookupFields(Arrays.asList("productId"));
oper.setIncludeFields(Arrays.asList("productCategory"));
oper.setStore(store);
oper.setup(null);
/* File contains 6 entries, but operator one entry is duplicate,
* so cache should contains only 5 entries after scanning input file.
*/
//Assert.assertEquals("Number of mappings ", 7, oper.cache.size());
CollectorTestSink<Map<String, Object>> sink = new CollectorTestSink<>();
@SuppressWarnings({ "unchecked", "rawtypes" })
CollectorTestSink<Object> tmp = (CollectorTestSink)sink;
oper.output.setSink(tmp);
oper.activate(null);
oper.beginWindow(0);
Map<String, Object> tuple = Maps.newHashMap();
tuple.put("productId", 3);
tuple.put("channelId", 4);
tuple.put("amount", 10.0);
Kryo kryo = new Kryo();
oper.input.process(kryo.copy(tuple));
oper.endWindow();
oper.deactivate();
/* Number of tuple, emitted */
Assert.assertEquals("Number of tuple emitted ", 1, sink.collectedTuples.size());
Map<String, Object> emitted = sink.collectedTuples.iterator().next();
/* The fields present in original event is kept as it is */
Assert.assertEquals("Number of fields in emitted tuple", 4, emitted.size());
Assert.assertEquals("value of productId is 3", tuple.get("productId"), emitted.get("productId"));
Assert.assertEquals("value of channelId is 4", tuple.get("channelId"), emitted.get("channelId"));
Assert.assertEquals("value of amount is 10.0", tuple.get("amount"), emitted.get("amount"));
/* Check if productCategory is added to the event */
Assert.assertEquals("productCategory is part of tuple", true, emitted.containsKey("productCategory"));
Assert.assertEquals("value of product category is 1", 5, emitted.get("productCategory"));
Assert.assertTrue(emitted.get("productCategory") instanceof Integer);
}
@Test
public void testEnrichmentOperatorDelimitedFSLoader() throws IOException, InterruptedException
{
URL origUrl = this.getClass().getResource("/productmapping-delim.txt");
URL fileUrl = new URL(this.getClass().getResource("/").toString() + "productmapping-delim1.txt");
FileUtils.deleteQuietly(new File(fileUrl.getPath()));
FileUtils.copyFile(new File(origUrl.getPath()), new File(fileUrl.getPath()));
MapEnricher oper = new MapEnricher();
DelimitedFSLoader store = new DelimitedFSLoader();
// store.setFieldDescription("productCategory:INTEGER,productId:INTEGER");
store.setFileName(fileUrl.toString());
store.setSchema(
"{\"separator\":\",\",\"fields\": [{\"name\": \"productCategory\",\"type\": \"Integer\"},{\"name\": \"productId\",\"type\": \"Integer\"},{\"name\": \"mfgDate\",\"type\": \"Date\",\"constraints\": {\"format\": \"dd/MM/yyyy\"}}]}");
oper.setLookupFields(Arrays.asList("productId"));
oper.setIncludeFields(Arrays.asList("productCategory", "mfgDate"));
oper.setStore(store);
oper.setup(null);
CollectorTestSink<Map<String, Object>> sink = new CollectorTestSink<>();
@SuppressWarnings({ "unchecked", "rawtypes" })
CollectorTestSink<Object> tmp = (CollectorTestSink)sink;
oper.output.setSink(tmp);
oper.activate(null);
oper.beginWindow(0);
Map<String, Object> tuple = Maps.newHashMap();
tuple.put("productId", 3);
tuple.put("channelId", 4);
tuple.put("amount", 10.0);
Kryo kryo = new Kryo();
oper.input.process(kryo.copy(tuple));
oper.endWindow();
oper.deactivate();
/* Number of tuple, emitted */
Assert.assertEquals("Number of tuple emitted ", 1, sink.collectedTuples.size());
Map<String, Object> emitted = sink.collectedTuples.iterator().next();
/* The fields present in original event is kept as it is */
Assert.assertEquals("Number of fields in emitted tuple", 5, emitted.size());
Assert.assertEquals("value of productId is 3", tuple.get("productId"), emitted.get("productId"));
Assert.assertEquals("value of channelId is 4", tuple.get("channelId"), emitted.get("channelId"));
Assert.assertEquals("value of amount is 10.0", tuple.get("amount"), emitted.get("amount"));
/* Check if productCategory is added to the event */
Assert.assertEquals("productCategory is part of tuple", true, emitted.containsKey("productCategory"));
Assert.assertEquals("value of product category is 1", 5, emitted.get("productCategory"));
Assert.assertTrue(emitted.get("productCategory") instanceof Integer);
/* Check if mfgDate is added to the event */
Assert.assertEquals("mfgDate is part of tuple", true, emitted.containsKey("productCategory"));
Date mfgDate = (Date)emitted.get("mfgDate");
Assert.assertEquals("value of day", 1, mfgDate.getDate());
Assert.assertEquals("value of month", 0, mfgDate.getMonth());
Assert.assertEquals("value of year", 2016, mfgDate.getYear() + 1900);
Assert.assertTrue(emitted.get("mfgDate") instanceof Date);
}
@Test
public void testEnrichmentOperatorFixedWidthFSLoader() throws IOException, InterruptedException
{
URL origUrl = this.getClass().getResource("/fixed-width-sample.txt");
MapEnricher oper = new MapEnricher();
FixedWidthFSLoader store = new FixedWidthFSLoader();
store.setFieldDescription(
"Year:INTEGER:4,Make:STRING:5,Model:STRING:40,Description:STRING:40,Price:DOUBLE:8,Date:DATE:10:\"dd:mm:yyyy\"");
store.setHasHeader(true);
store.setPadding('_');
store.setFileName(origUrl.toString());
oper.setLookupFields(Arrays.asList("Year"));
oper.setIncludeFields(Arrays.asList("Year", "Make", "Model", "Price", "Date"));
oper.setStore(store);
oper.setup(null);
CollectorTestSink<Map<String, Object>> sink = new CollectorTestSink<>();
@SuppressWarnings({ "unchecked", "rawtypes" })
CollectorTestSink<Object> tmp = (CollectorTestSink)sink;
oper.output.setSink(tmp);
oper.activate(null);
oper.beginWindow(0);
Map<String, Object> tuple = Maps.newHashMap();
tuple.put("Year", 1997);
Kryo kryo = new Kryo();
oper.input.process(kryo.copy(tuple));
oper.endWindow();
oper.deactivate();
oper.teardown();
/* Number of tuple, emitted */
Assert.assertEquals("Number of tuple emitted ", 1, sink.collectedTuples.size());
Map<String, Object> emitted = sink.collectedTuples.iterator().next();
/* The fields present in original event is kept as it is */
Assert.assertEquals("Number of fields in emitted tuple", 5, emitted.size());
Assert.assertEquals("Value of Year is 1997", tuple.get("Year"), emitted.get("Year"));
/* Check if Make is added to the event */
Assert.assertEquals("Make is part of tuple", true, emitted.containsKey("Make"));
Assert.assertEquals("Value of Make", "Ford", emitted.get("Make"));
/* Check if Model is added to the event */
Assert.assertEquals("Model is part of tuple", true, emitted.containsKey("Model"));
Assert.assertEquals("Value of Model", "E350", emitted.get("Model"));
/* Check if Price is added to the event */
Assert.assertEquals("Price is part of tuple", true, emitted.containsKey("Price"));
Assert.assertEquals("Value of Price is 3000", 3000.0, emitted.get("Price"));
Assert.assertTrue(emitted.get("Price") instanceof Double);
/* Check if Date is added to the event */
Assert.assertEquals("Date is part of tuple", true, emitted.containsKey("Date"));
Date mfgDate = (Date)emitted.get("Date");
Assert.assertEquals("value of day", 1, mfgDate.getDate());
Assert.assertEquals("value of month", 0, mfgDate.getMonth());
Assert.assertEquals("value of year", 2016, mfgDate.getYear() + 1900);
Assert.assertTrue(emitted.get("Date") instanceof Date);
}
}