blob: 0a65f30a2cc71dbd31508132ad554ca5aa39ab11 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.lib.db.jdbc;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Maps;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.lib.testbench.CollectorTestSink;
import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext;
/**
* Test for {@link JDBCLookupCacheBackedOperator}
*/
public class JDBCLookupCacheBackedOperatorTest
{
private static final String INMEM_DB_URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true";
private static final String INMEM_DB_DRIVER = org.hsqldb.jdbcDriver.class.getName();
protected static final String TABLE_NAME = "Test_Lookup_Cache";
protected static TestJDBCLookupCacheBackedOperator lookupCacheBackedOperator =
new TestJDBCLookupCacheBackedOperator();
protected static CollectorTestSink<Object> sink = new CollectorTestSink<>();
protected static final Map<Integer, String> mapping = Maps.newHashMap();
static {
mapping.put(1, "one");
mapping.put(2, "two");
mapping.put(3, "three");
mapping.put(4, "four");
mapping.put(5, "five");
}
protected static final transient Logger logger = LoggerFactory.getLogger(
JDBCLookupCacheBackedOperatorTest.class);
private static final Exchanger<List<Object>> bulkValuesExchanger = new Exchanger<>();
public static class TestJDBCLookupCacheBackedOperator extends JDBCLookupCacheBackedOperator<String>
{
@Override
public Integer getKeyFromTuple(String tuple)
{
return Integer.parseInt(tuple);
}
@Override
public Map<Object, Object> loadInitialData()
{
return null;
}
@Override
protected void preparePutStatement(PreparedStatement putStatement, Object key, Object value) throws SQLException
{
putStatement.setInt(1, (Integer)key);
putStatement.setString(2, (String)value);
}
@Override
protected void prepareGetStatement(PreparedStatement getStatement, Object key) throws SQLException
{
getStatement.setInt(1, (Integer)key);
}
@Override
public Object processResultSet(ResultSet resultSet) throws SQLException
{
if (resultSet.next()) {
return resultSet.getString(1);
}
return null;
}
@Override
protected String fetchInsertQuery()
{
return "INSERT INTO " + TABLE_NAME + " (col1, col2) VALUES (?, ?)";
}
@Override
protected String fetchGetQuery()
{
return "select col1, col2 from " + TABLE_NAME + " where col1 = ?";
}
@Override
public List<Object> getAll(List<Object> keys)
{
List<Object> values = super.getAll(keys);
try {
bulkValuesExchanger.exchange(values);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return values;
}
@Override
public void putAll(Map<Object, Object> m)
{
}
@Override
public void remove(Object key)
{
}
}
@Test
public void test() throws Exception
{
lookupCacheBackedOperator.beginWindow(0);
lookupCacheBackedOperator.input.process("1");
lookupCacheBackedOperator.input.process("2");
lookupCacheBackedOperator.endWindow();
// Check values send vs received
Assert.assertEquals("Number of emitted tuples", 2, sink.collectedTuples.size());
List<Object> bulk = bulkValuesExchanger.exchange(null, 30, TimeUnit.SECONDS);
Assert.assertEquals("bulk values retrieval", 2, bulk.size());
}
@BeforeClass
public static void setup() throws Exception
{
// This will load the JDBC driver, each DB has its own driver
Class.forName(INMEM_DB_DRIVER).newInstance();
Connection con = DriverManager.getConnection(INMEM_DB_URL);
Statement stmt = con.createStatement();
String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
+ " (col1 INTEGER, col2 VARCHAR(20))";
stmt.executeUpdate(createTable);
stmt.executeUpdate("Delete from " + TABLE_NAME);
// populate the database
for (Map.Entry<Integer, String> entry : mapping.entrySet()) {
String insert = "INSERT INTO " + TABLE_NAME
+ " (col1, col2) VALUES (" + entry.getKey() + ", '"
+ entry.getValue() + "')";
stmt.executeUpdate(insert);
}
// Setup the operator
lookupCacheBackedOperator.getStore().setDatabaseUrl(INMEM_DB_URL);
lookupCacheBackedOperator.getStore().setDatabaseDriver(INMEM_DB_DRIVER);
Calendar now = Calendar.getInstance();
now.add(Calendar.SECOND, 5);
SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss");
lookupCacheBackedOperator.getCacheManager().setRefreshTime(format.format(now.getTime()));
lookupCacheBackedOperator.output.setSink(sink);
OperatorContext context = mockOperatorContext(7);
lookupCacheBackedOperator.setup(context);
}
@AfterClass
public static void teardown() throws Exception
{
}
}