blob: d5463cb0ab4c1db3674b14c4ace48849c8775c6a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.contrib.mongodb;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.datatorrent.api.Attribute.AttributeMap;
import com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap;
import com.datatorrent.api.DAG;
import static org.apache.apex.malhar.lib.helper.OperatorContextTestHelper.mockOperatorContext;
public class MongoDBOutputOperatorTest
{
private static final Logger logger = LoggerFactory.getLogger(MongoDBOutputOperatorTest.class);
public String[] hashMapping1 = new String[columnNum];
public String[] arrayMapping1 = new String[columnNum];
public static final int maxTuple = 20;
public static final int columnNum = 5;
public AttributeMap attrmap = new DefaultAttributeMap();
private static final int inputCount = 6;
public void buildDataset()
{
hashMapping1[0] = "prop1:t1.col1:INT";
hashMapping1[1] = "prop3:t1.col3:STRING";
hashMapping1[2] = "prop2:t1.col2:DATE";
hashMapping1[3] = "prop4:t2.col1:STRING";
hashMapping1[4] = "prop5:t2.col2:INT";
arrayMapping1[0] = "t1.col1:INT";
arrayMapping1[1] = "t1.col3:STRING";
arrayMapping1[2] = "t1.col2:DATE";
arrayMapping1[3] = "t2.col2:STRING";
arrayMapping1[4] = "t2.col1:INT";
attrmap.put(DAG.APPLICATION_ID, "myMongoDBOouputOperatorAppId");
}
public HashMap<String, Object> generateHashMapData(int j, MongoDBHashMapOutputOperator oper)
{
HashMap<String, Object> hm = new HashMap<String, Object>();
for (int i = 0; i < columnNum; i++) {
String[] tokens = hashMapping1[i].split("[:]");
String[] subtok = tokens[1].split("[.]");
String table = subtok[0];
String prop = tokens[0];
String type = tokens[2];
if (type.contains("INT")) {
hm.put(prop, j * columnNum + i);
} else if (type.equals("STRING")) {
hm.put(prop, String.valueOf(j * columnNum + i));
} else if (type.equals("DATE")) {
hm.put(prop, new Date());
}
oper.propTableMap.put(prop, table);
}
return hm;
}
public ArrayList<Object> generateArrayListData(int j, MongoDBArrayListOutputOperator oper)
{
ArrayList<Object> al = new ArrayList<Object>();
for (int i = 0; i < columnNum; i++) {
String[] tokens = arrayMapping1[i].split("[:]");
String[] subtok = tokens[0].split("[.]");
String table = subtok[0];
String prop = subtok[1];
String type = tokens[1];
if (type.contains("INT")) {
al.add(j * columnNum + i);
} else if (type.equals("STRING")) {
al.add(String.valueOf(j * columnNum + i));
} else if (type.equals("DATE")) {
al.add(new Date());
}
}
return al;
}
public void readDB(MongoDBOutputOperator oper)
{
for (Object o: oper.getTableList()) {
String table = (String)o;
DBCursor cursor = oper.db.getCollection(table).find();
while (cursor.hasNext()) {
logger.debug("{}", cursor.next());
}
}
}
@Test
public void MongoDBHashMapOutputOperatorTest()
{
buildDataset();
MongoDBHashMapOutputOperator<Object> oper = new MongoDBHashMapOutputOperator<Object>();
oper.setBatchSize(3);
oper.setHostName("localhost");
oper.setDataBase("test");
oper.setUserName("test");
oper.setPassWord("123");
oper.setWindowIdColumnName("winid");
oper.setOperatorIdColumnName("operatorid");
oper.setMaxWindowTable("maxWindowTable");
oper.setQueryFunction(1);
oper.setColumnMapping(hashMapping1);
oper.setup(mockOperatorContext(1));
for (Object o: oper.getTableList()) {
String table = (String)o;
oper.db.getCollection(table).drop();
}
oper.beginWindow(oper.getLastWindowId() + 1);
logger.debug("beginwindow {}", oper.getLastWindowId() + 1);
for (int i = 0; i < maxTuple; ++i) {
HashMap<String, Object> hm = generateHashMapData(i, oper);
// logger.debug(hm.toString());
oper.inputPort.process(hm);
}
oper.endWindow();
readDB(oper);
oper.teardown();
}
@Test
public void MongoDBArrayListOutputOperatorTest()
{
buildDataset();
MongoDBArrayListOutputOperator oper = new MongoDBArrayListOutputOperator();
oper.setBatchSize(3);
oper.setHostName("localhost");
oper.setDataBase("test");
oper.setUserName("test");
oper.setPassWord("123");
oper.setWindowIdColumnName("winid");
oper.setOperatorIdColumnName("operatorid");
oper.setMaxWindowTable("maxWindowTable");
oper.setQueryFunction(1);
oper.setColumnMapping(arrayMapping1);
oper.setup(mockOperatorContext(2));
for (Object o: oper.getTableList()) {
String table = (String)o;
oper.db.getCollection(table).drop();
}
oper.beginWindow(oper.getLastWindowId() + 1);
logger.debug("beginwindow {}", oper.getLastWindowId() + 1);
for (int i = 0; i < maxTuple; ++i) {
ArrayList<Object> al = generateArrayListData(i, oper);
oper.inputPort.process(al);
}
oper.endWindow();
readDB(oper);
oper.teardown();
}
@Test
public void MongoDBPOJOOperatorTest()
{
buildDataset();
MongoDBPOJOOutputOperator oper = new MongoDBPOJOOutputOperator();
oper.setBatchSize(3);
oper.setHostName("localhost");
oper.setDataBase("test1");
oper.setUserName("test");
oper.setPassWord("123");
oper.setWindowIdColumnName("winid");
oper.setOperatorIdColumnName("operatorid");
oper.setMaxWindowTable("maxWindowTable");
oper.setQueryFunction(1);
ArrayList<String> tablenames = new ArrayList<String>();
for (int i = 0; i < inputCount; i++) {
tablenames.add("t1");
}
oper.tableList.add("t1");
oper.setTablenames(tablenames);
ArrayList<String> fieldTypes = new ArrayList<String>();
fieldTypes.add("java.lang.String");
fieldTypes.add("java.lang.String");
fieldTypes.add("java.lang.Integer");
fieldTypes.add("java.util.Map");
fieldTypes.add("java.lang.String");
fieldTypes.add("int");
oper.setFieldTypes(fieldTypes);
ArrayList<String> keys = new ArrayList<String>();
keys.add("id");
keys.add("name");
keys.add("age");
keys.add("mapping");
keys.add("address.city");
keys.add("address.housenumber");
oper.setKeys(keys);
ArrayList<String> expressions = new ArrayList<String>();
expressions.add("getId()");
expressions.add("getName()");
expressions.add("getAge()");
expressions.add("getMapping()");
expressions.add("getAddress().getCity()");
expressions.add("getAddress().getHousenumber()");
oper.setExpressions(expressions);
oper.setup(mockOperatorContext(2));
for (String table: oper.getTableList()) {
logger.debug("table in test is {}", table);
oper.db.getCollection(table).drop();
}
Assert.assertEquals("size of tablenames", inputCount, tablenames.size());
Assert.assertEquals("size of tables is same as size of expressions",tablenames.size(), expressions.size());
Assert.assertEquals("size of keys is same as size of expressions",keys.size(), expressions.size());
Assert.assertEquals("size of fieldTypes is same as size of expressions",fieldTypes.size(), expressions.size());
oper.beginWindow(oper.getLastWindowId() + 1);
logger.debug("beginwindow {}", oper.getLastWindowId() + 1);
for (int i = 0; i < 3; i++) {
TestPOJO al = new TestPOJO();
al.setId("test" + i);
al.setName("testname" + i);
al.setAge(i);
TestPOJO.Address address = new TestPOJO.Address();
address.setCity("testcity" + i);
address.setHousenumber(i + 10);
HashMap<String, Object> hmap = new HashMap<String, Object>();
hmap.put("key" + i, i);
al.setMapping(hmap);
al.setAddress(address);
oper.inputPort.process(al);
}
oper.endWindow();
for (String table: oper.getTableList()) {
DBCursor cursor = oper.db.getCollection(table).find();
DBObject obj = cursor.next();
Assert.assertEquals("id is ", "test0", obj.get("id"));
Assert.assertEquals("name is", "testname0", obj.get("name"));
Assert.assertEquals("age is ", 0, obj.get("age"));
Assert.assertEquals("mapping is", "{ \"key0\" : 0}", obj.get("mapping").toString());
Assert.assertEquals("address is", "{ \"city\" : \"testcity0\" , \"housenumber\" : 10}", obj.get("address").toString());
obj = cursor.next();
Assert.assertEquals("id is ", "test1", obj.get("id"));
Assert.assertEquals("name is", "testname1", obj.get("name"));
Assert.assertEquals("age is ", 1, obj.get("age"));
Assert.assertEquals("mapping is", "{ \"key1\" : 1}", obj.get("mapping").toString());
Assert.assertEquals("address is", "{ \"city\" : \"testcity1\" , \"housenumber\" : 11}", obj.get("address").toString());
obj = cursor.next();
Assert.assertEquals("id is ", "test2", obj.get("id"));
Assert.assertEquals("name is", "testname2", obj.get("name"));
Assert.assertEquals("age is ", 2, obj.get("age"));
Assert.assertEquals("mapping is", "{ \"key2\" : 2}", obj.get("mapping").toString());
Assert.assertEquals("address is", "{ \"city\" : \"testcity2\" , \"housenumber\" : 12}", obj.get("address").toString());
}
oper.teardown();
}
}