| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package com.datatorrent.lib.db.jdbc; |
| |
| import java.sql.Connection; |
| import java.sql.DriverManager; |
| import java.sql.PreparedStatement; |
| import java.sql.ResultSet; |
| import java.sql.SQLException; |
| import java.sql.Statement; |
| import java.text.SimpleDateFormat; |
| import java.util.Calendar; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.Exchanger; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.junit.AfterClass; |
| import org.junit.Assert; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.collect.Maps; |
| |
| import com.datatorrent.api.Context.OperatorContext; |
| import com.datatorrent.lib.testbench.CollectorTestSink; |
| |
| import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext; |
| |
| /** |
| * Test for {@link JDBCLookupCacheBackedOperator} |
| */ |
| public class JDBCLookupCacheBackedOperatorTest |
| { |
| private static final String INMEM_DB_URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true"; |
| private static final String INMEM_DB_DRIVER = org.hsqldb.jdbcDriver.class.getName(); |
| protected static final String TABLE_NAME = "Test_Lookup_Cache"; |
| |
| protected static TestJDBCLookupCacheBackedOperator lookupCacheBackedOperator = |
| new TestJDBCLookupCacheBackedOperator(); |
| protected static CollectorTestSink<Object> sink = new CollectorTestSink<>(); |
| protected static final Map<Integer, String> mapping = Maps.newHashMap(); |
| |
| static { |
| mapping.put(1, "one"); |
| mapping.put(2, "two"); |
| mapping.put(3, "three"); |
| mapping.put(4, "four"); |
| mapping.put(5, "five"); |
| } |
| |
| protected static final transient Logger logger = LoggerFactory.getLogger( |
| JDBCLookupCacheBackedOperatorTest.class); |
| |
| private static final Exchanger<List<Object>> bulkValuesExchanger = new Exchanger<>(); |
| |
| public static class TestJDBCLookupCacheBackedOperator extends JDBCLookupCacheBackedOperator<String> |
| { |
| |
| @Override |
| public Integer getKeyFromTuple(String tuple) |
| { |
| return Integer.parseInt(tuple); |
| } |
| |
| @Override |
| public Map<Object, Object> loadInitialData() |
| { |
| return null; |
| } |
| |
| @Override |
| protected void preparePutStatement(PreparedStatement putStatement, Object key, Object value) throws SQLException |
| { |
| putStatement.setInt(1, (Integer)key); |
| putStatement.setString(2, (String)value); |
| |
| } |
| |
| @Override |
| protected void prepareGetStatement(PreparedStatement getStatement, Object key) throws SQLException |
| { |
| getStatement.setInt(1, (Integer)key); |
| } |
| |
| @Override |
| public Object processResultSet(ResultSet resultSet) throws SQLException |
| { |
| if (resultSet.next()) { |
| return resultSet.getString(1); |
| } |
| return null; |
| } |
| |
| @Override |
| protected String fetchInsertQuery() |
| { |
| return "INSERT INTO " + TABLE_NAME + " (col1, col2) VALUES (?, ?)"; |
| } |
| |
| @Override |
| protected String fetchGetQuery() |
| { |
| return "select col1, col2 from " + TABLE_NAME + " where col1 = ?"; |
| } |
| |
| @Override |
| public List<Object> getAll(List<Object> keys) |
| { |
| List<Object> values = super.getAll(keys); |
| try { |
| bulkValuesExchanger.exchange(values); |
| } catch (InterruptedException e) { |
| throw new RuntimeException(e); |
| } |
| return values; |
| } |
| |
| @Override |
| public void putAll(Map<Object, Object> m) |
| { |
| } |
| |
| @Override |
| public void remove(Object key) |
| { |
| } |
| } |
| |
| @Test |
| public void test() throws Exception |
| { |
| lookupCacheBackedOperator.beginWindow(0); |
| lookupCacheBackedOperator.input.process("1"); |
| lookupCacheBackedOperator.input.process("2"); |
| lookupCacheBackedOperator.endWindow(); |
| |
| // Check values send vs received |
| Assert.assertEquals("Number of emitted tuples", 2, sink.collectedTuples.size()); |
| |
| List<Object> bulk = bulkValuesExchanger.exchange(null, 30, TimeUnit.SECONDS); |
| Assert.assertEquals("bulk values retrieval", 2, bulk.size()); |
| } |
| |
| @BeforeClass |
| public static void setup() throws Exception |
| { |
| // This will load the JDBC driver, each DB has its own driver |
| Class.forName(INMEM_DB_DRIVER).newInstance(); |
| |
| Connection con = DriverManager.getConnection(INMEM_DB_URL); |
| Statement stmt = con.createStatement(); |
| |
| String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME |
| + " (col1 INTEGER, col2 VARCHAR(20))"; |
| |
| stmt.executeUpdate(createTable); |
| stmt.executeUpdate("Delete from " + TABLE_NAME); |
| |
| // populate the database |
| for (Map.Entry<Integer, String> entry : mapping.entrySet()) { |
| String insert = "INSERT INTO " + TABLE_NAME |
| + " (col1, col2) VALUES (" + entry.getKey() + ", '" |
| + entry.getValue() + "')"; |
| stmt.executeUpdate(insert); |
| } |
| |
| // Setup the operator |
| lookupCacheBackedOperator.getStore().setDatabaseUrl(INMEM_DB_URL); |
| lookupCacheBackedOperator.getStore().setDatabaseDriver(INMEM_DB_DRIVER); |
| |
| Calendar now = Calendar.getInstance(); |
| now.add(Calendar.SECOND, 5); |
| |
| SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss"); |
| lookupCacheBackedOperator.getCacheManager().setRefreshTime(format.format(now.getTime())); |
| |
| lookupCacheBackedOperator.output.setSink(sink); |
| |
| OperatorContext context = mockOperatorContext(7); |
| lookupCacheBackedOperator.setup(context); |
| } |
| |
| @AfterClass |
| public static void teardown() throws Exception |
| { |
| } |
| |
| } |