| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.cache.store.jdbc; |
| |
| import java.sql.Connection; |
| import java.sql.Date; |
| import java.sql.DriverManager; |
| import java.sql.PreparedStatement; |
| import java.sql.ResultSet; |
| import java.sql.SQLException; |
| import java.sql.Statement; |
| import java.sql.Types; |
| import java.util.Random; |
| import javax.cache.integration.CacheLoaderException; |
| import org.apache.commons.dbcp.DelegatingConnection; |
| import org.apache.commons.dbcp.DelegatingPreparedStatement; |
| import org.apache.ignite.IgniteCache; |
| import org.apache.ignite.cache.store.jdbc.dialect.H2Dialect; |
| import org.apache.ignite.cache.store.jdbc.model.Gender; |
| import org.apache.ignite.cache.store.jdbc.model.Person; |
| import org.apache.ignite.cache.store.jdbc.model.PersonKey; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.configuration.ConnectorConfiguration; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.internal.binary.BinaryMarshaller; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.marshaller.Marshaller; |
| import org.apache.ignite.testframework.MvccFeatureChecker; |
| import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; |
| import org.junit.Test; |
| |
| import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; |
| import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; |
| import static org.apache.ignite.cache.CacheMode.PARTITIONED; |
| |
| /** |
| * Class for {@link CacheJdbcPojoStore} tests. |
| */ |
| public abstract class CacheJdbcPojoStoreAbstractSelfTest extends GridCommonAbstractTest { |
| /** DB connection URL. */ |
| private static final String DFLT_CONN_URL = "jdbc:h2:mem:TestDatabase;DB_CLOSE_DELAY=-1"; |
| |
| /** Organization count. */ |
| private static final int ORGANIZATION_CNT = 1000; |
| |
| /** Person count. */ |
| private static final int PERSON_CNT = 100000; |
| |
| /** Fetch size. */ |
| public static final int FETCH_SZ = 42; |
| |
| /** Test cache name. */ |
| private static final String CACHE_NAME = "test-cache"; |
| |
| /** Flag indicating that tests should use transactional cache. */ |
| private static boolean transactional; |
| |
| /** Flag indicating that tests should use primitive classes like java.lang.Integer for keys. */ |
| protected static boolean builtinKeys; |
| |
| /** Flag indicating that classes for keys available on class path or not. */ |
| private static boolean noKeyClasses; |
| |
| /** Flag indicating that classes for values available on class path or not. */ |
| private static boolean noValClasses; |
| |
| /** Flag indicating that fetch size should be checked. */ |
| private static boolean checkFetchSize; |
| |
| /** Batch size to load in parallel. */ |
| private static int parallelLoadThreshold; |
| |
| /** |
| * @return Flag indicating that all internal SQL queries should use escaped identifiers. |
| */ |
| protected boolean sqlEscapeAll(){ |
| return false; |
| } |
| |
| /** |
| * @return Connection to test in-memory H2 database. |
| * @throws SQLException if failed to connect. |
| */ |
| protected Connection getConnection() throws SQLException { |
| return new DelegatingConnection(DriverManager.getConnection(DFLT_CONN_URL, "sa", "")) { |
| /** {@inheritDoc} */ |
| @Override public PreparedStatement prepareStatement(String sql) throws SQLException { |
| return new DelegatingPreparedStatement(this, super.prepareStatement(sql)) { |
| /** {@inheritDoc} */ |
| @Override public ResultSet executeQuery() throws SQLException { |
| if (checkFetchSize) |
| assertEquals(FETCH_SZ, getFetchSize()); |
| |
| return super.executeQuery(); |
| } |
| }; |
| } |
| }; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void beforeTestsStarted() throws Exception { |
| MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.CACHE_STORE); |
| |
| super.beforeTestsStarted(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void beforeTest() throws Exception { |
| Connection conn = getConnection(); |
| |
| Statement stmt = conn.createStatement(); |
| |
| stmt.executeUpdate("DROP TABLE IF EXISTS Organization"); |
| stmt.executeUpdate("DROP TABLE IF EXISTS Person"); |
| |
| stmt.executeUpdate("CREATE TABLE Organization (" + |
| " id INTEGER PRIMARY KEY," + |
| " name VARCHAR(50)," + |
| " city VARCHAR(50))"); |
| |
| stmt.executeUpdate("CREATE TABLE Person (" + |
| " id INTEGER PRIMARY KEY," + |
| " org_id INTEGER," + |
| " birthday DATE," + |
| " name VARCHAR(50)," + |
| " gender VARCHAR(50))"); |
| |
| conn.commit(); |
| |
| U.closeQuiet(stmt); |
| |
| fillSampleDatabase(conn); |
| |
| U.closeQuiet(conn); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void afterTest() throws Exception { |
| stopAllGrids(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { |
| MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.CACHE_STORE); |
| |
| IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); |
| |
| cfg.setCacheConfiguration(cacheConfiguration()); |
| |
| cfg.setMarshaller(marshaller()); |
| |
| ConnectorConfiguration connCfg = new ConnectorConfiguration(); |
| cfg.setConnectorConfiguration(connCfg); |
| |
| return cfg; |
| } |
| |
| /** |
| * @return Marshaller to be used in test. |
| */ |
| protected abstract Marshaller marshaller(); |
| |
| /** |
| * @return Types to be used in test. |
| */ |
| protected JdbcType[] storeTypes() { |
| JdbcType[] storeTypes = new JdbcType[2]; |
| |
| storeTypes[0] = new JdbcType(); |
| storeTypes[0].setCacheName(CACHE_NAME); |
| storeTypes[0].setDatabaseSchema("PUBLIC"); |
| storeTypes[0].setDatabaseTable("ORGANIZATION"); |
| |
| if (builtinKeys) { |
| storeTypes[0].setKeyType("java.lang.Long"); |
| storeTypes[0].setKeyFields(new JdbcTypeField(Types.INTEGER, "ID", Integer.class, "id")); |
| } |
| else { |
| storeTypes[0].setKeyType("org.apache.ignite.cache.store.jdbc.model.OrganizationKey" + (noKeyClasses ? "1" : "")); |
| storeTypes[0].setKeyFields(new JdbcTypeField(Types.INTEGER, "ID", Integer.class, "id")); |
| } |
| |
| storeTypes[0].setValueType("org.apache.ignite.cache.store.jdbc.model.Organization" + (noValClasses ? "1" : "")); |
| |
| boolean escape = sqlEscapeAll(); |
| |
| storeTypes[0].setValueFields( |
| new JdbcTypeField(Types.INTEGER, escape ? "ID" : "Id", Integer.class, "id"), |
| new JdbcTypeField(Types.VARCHAR, escape ? "NAME" : "Name", String.class, "name"), |
| new JdbcTypeField(Types.VARCHAR, escape ? "CITY" : "City", String.class, "city")); |
| |
| storeTypes[1] = new JdbcType(); |
| storeTypes[1].setCacheName(CACHE_NAME); |
| storeTypes[1].setDatabaseSchema("PUBLIC"); |
| storeTypes[1].setDatabaseTable("PERSON"); |
| |
| if (builtinKeys) { |
| storeTypes[1].setKeyType("java.lang.Integer"); |
| storeTypes[1].setKeyFields(new JdbcTypeField(Types.INTEGER, "ID", Long.class, "id")); |
| } |
| else { |
| storeTypes[1].setKeyType("org.apache.ignite.cache.store.jdbc.model.PersonKey" + (noKeyClasses ? "1" : "")); |
| storeTypes[1].setKeyFields(new JdbcTypeField(Types.INTEGER, "ID", Integer.class, "id")); |
| } |
| |
| storeTypes[1].setValueType("org.apache.ignite.cache.store.jdbc.model.Person" + (noValClasses ? "1" : "")); |
| storeTypes[1].setValueFields( |
| new JdbcTypeField(Types.INTEGER, "ID", Integer.class, "id"), |
| new JdbcTypeField(Types.INTEGER, "ORG_ID", Integer.class, "orgId"), |
| new JdbcTypeField(Types.DATE, "BIRTHDAY", Date.class, "birthday"), |
| new JdbcTypeField(Types.VARCHAR, "NAME", String.class, "name"), |
| new JdbcTypeField(Types.VARCHAR, "GENDER", Gender.class, "gender")); |
| |
| return storeTypes; |
| } |
| |
| /** |
| * @return Cache configuration for test. |
| * @throws Exception In case when failed to create cache configuration. |
| */ |
| protected CacheConfiguration cacheConfiguration() throws Exception { |
| CacheConfiguration cc = defaultCacheConfiguration(); |
| |
| cc.setName(CACHE_NAME); |
| cc.setCacheMode(PARTITIONED); |
| cc.setAtomicityMode(transactional ? TRANSACTIONAL : ATOMIC); |
| cc.setWriteBehindEnabled(false); |
| cc.setStoreKeepBinary(storeKeepBinary()); |
| |
| CacheJdbcPojoStoreFactory<Object, Object> storeFactory = new CacheJdbcPojoStoreFactory<>(); |
| |
| H2Dialect dialect = new H2Dialect(); |
| |
| dialect.setFetchSize(FETCH_SZ); |
| |
| storeFactory.setDialect(dialect); |
| storeFactory.setTypes(storeTypes()); |
| storeFactory.setDataSourceFactory(new H2DataSourceFactory()); // H2 DataSource factory. |
| storeFactory.setSqlEscapeAll(sqlEscapeAll()); |
| storeFactory.setParallelLoadCacheMinimumThreshold(parallelLoadThreshold); |
| |
| cc.setCacheStoreFactory(storeFactory); |
| cc.setReadThrough(true); |
| cc.setWriteThrough(true); |
| cc.setLoadPreviousValue(true); |
| |
| return cc; |
| } |
| |
| /** |
| * @return Flag indicate keep value in binary format or not. |
| */ |
| protected boolean storeKeepBinary(){ |
| return false; |
| } |
| |
| /** |
| * Fill in-memory database with sample data. |
| * |
| * @param conn Connection to database. |
| * @throws SQLException In case of filling database with sample data failed. |
| */ |
| protected void fillSampleDatabase(Connection conn) throws SQLException { |
| info("Start to fill sample database..."); |
| |
| PreparedStatement orgStmt = conn.prepareStatement("INSERT INTO Organization(id, name, city) VALUES (?, ?, ?)"); |
| |
| for (int i = 0; i < ORGANIZATION_CNT; i++) { |
| orgStmt.setInt(1, i); |
| orgStmt.setString(2, "name" + i); |
| orgStmt.setString(3, "city" + i % 10); |
| |
| orgStmt.addBatch(); |
| } |
| |
| orgStmt.executeBatch(); |
| |
| U.closeQuiet(orgStmt); |
| |
| conn.commit(); |
| |
| PreparedStatement prnStmt = conn.prepareStatement( |
| "INSERT INTO Person(id, org_id, birthday, name, gender) VALUES (?, ?, ?, ?, ?)"); |
| |
| Random rnd = new Random(); |
| |
| for (int i = 0; i < PERSON_CNT; i++) { |
| prnStmt.setInt(1, i); |
| prnStmt.setInt(2, i % 100); |
| prnStmt.setDate(3, Date.valueOf(String.format("%d-%d-%d", 1970 + rnd.nextInt(50), 1 + rnd.nextInt(11), 1 + rnd.nextInt(27)))); |
| prnStmt.setString(4, "name" + i); |
| prnStmt.setString(5, Gender.random().toString()); |
| |
| prnStmt.addBatch(); |
| } |
| |
| prnStmt.executeBatch(); |
| |
| conn.commit(); |
| |
| U.closeQuiet(prnStmt); |
| |
| info("Sample database prepared."); |
| } |
| |
| /** |
| * Start test grid with specified options. |
| * |
| * @param builtin {@code True} if keys are built in java types. |
| * @param noKeyCls {@code True} if keys classes are not on class path. |
| * @param noValCls {@code True} if values classes are not on class path. |
| * @param trn {@code True} if cache should be started in transactional mode. |
| * @param threshold Load batch size. |
| * @throws Exception If failed to start grid. |
| */ |
| protected void startTestGrid(boolean builtin, boolean noKeyCls, boolean noValCls, boolean trn, int threshold) throws Exception { |
| builtinKeys = builtin; |
| noKeyClasses = noKeyCls; |
| noValClasses = noValCls; |
| transactional = trn; |
| parallelLoadThreshold = threshold; |
| |
| startGrid(); |
| } |
| |
| /** |
| * Check that data was loaded correctly. |
| */ |
| protected void checkCacheLoad() { |
| IgniteCache<Object, Object> c1 = grid().cache(CACHE_NAME); |
| |
| checkFetchSize = true; |
| |
| c1.loadCache(null); |
| |
| checkFetchSize = false; |
| |
| assertEquals(ORGANIZATION_CNT + PERSON_CNT, c1.size()); |
| } |
| |
| /** |
| * Check that data was loaded correctly. |
| */ |
| protected void checkCacheLoadWithSql() { |
| IgniteCache<Object, Object> c1 = grid().cache(CACHE_NAME); |
| |
| checkFetchSize = true; |
| |
| c1.loadCache(null, "org.apache.ignite.cache.store.jdbc.model.PersonKey", "select id, org_id, name, birthday, gender from Person"); |
| |
| checkFetchSize = false; |
| |
| assertEquals(PERSON_CNT, c1.size()); |
| } |
| |
| /** |
| * Checks that data was loaded correctly with prepared statement. |
| */ |
| protected void checkCacheLoadWithStatement() throws SQLException { |
| Connection conn = null; |
| |
| PreparedStatement stmt = null; |
| |
| try { |
| conn = getConnection(); |
| |
| conn.setAutoCommit(true); |
| |
| String qry = "select id, org_id, name, birthday, gender from Person"; |
| |
| stmt = conn.prepareStatement(qry); |
| |
| IgniteCache<Object, Object> c1 = grid().cache(CACHE_NAME); |
| |
| c1.loadCache(null, "org.apache.ignite.cache.store.jdbc.model.PersonKey", stmt); |
| |
| assertEquals(PERSON_CNT, c1.size()); |
| } |
| finally { |
| U.closeQuiet(stmt); |
| |
| U.closeQuiet(conn); |
| } |
| |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testLoadCacheWithStatement() throws Exception { |
| startTestGrid(false, false, false, false, 512); |
| |
| checkCacheLoadWithStatement(); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testLoadCacheWithStatementTx() throws Exception { |
| startTestGrid(false, false, false, true, 512); |
| |
| checkCacheLoadWithStatement(); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testLoadCache() throws Exception { |
| startTestGrid(false, false, false, false, 512); |
| |
| checkCacheLoad(); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testLoadCacheAll() throws Exception { |
| startTestGrid(false, false, false, false, ORGANIZATION_CNT + PERSON_CNT + 1); |
| |
| checkCacheLoad(); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testLoadCacheWithSql() throws Exception { |
| startTestGrid(false, false, false, false, 512); |
| |
| checkCacheLoadWithSql(); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testLoadCacheTx() throws Exception { |
| startTestGrid(false, false, false, true, 512); |
| |
| checkCacheLoad(); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testLoadCacheWithSqlTx() throws Exception { |
| startTestGrid(false, false, false, true, 512); |
| |
| checkCacheLoadWithSql(); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testLoadCachePrimitiveKeys() throws Exception { |
| startTestGrid(true, false, false, false, 512); |
| |
| checkCacheLoad(); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testLoadCachePrimitiveKeysTx() throws Exception { |
| startTestGrid(true, false, false, true, 512); |
| |
| checkCacheLoad(); |
| } |
| |
| /** |
| * Check put in cache and store it in db. |
| * |
| * @throws Exception If failed. |
| */ |
| private void checkPutRemove() throws Exception { |
| boolean binaryMarshaller = marshaller() instanceof BinaryMarshaller || marshaller() == null; |
| |
| IgniteCache<Object, Person> c1 = grid().cache(CACHE_NAME); |
| |
| Connection conn = getConnection(); |
| try { |
| PreparedStatement stmt = conn.prepareStatement("SELECT ID, ORG_ID, BIRTHDAY, NAME, GENDER FROM PERSON WHERE ID = ?"); |
| |
| stmt.setInt(1, -1); |
| |
| ResultSet rs = stmt.executeQuery(); |
| |
| assertFalse("Unexpected non empty result set", rs.next()); |
| |
| U.closeQuiet(rs); |
| |
| Date testDate = Date.valueOf("2001-05-05"); |
| Gender testGender = Gender.random(); |
| |
| Person val = new Person(-1, -2, testDate, "Person-to-test-put-insert", 999, testGender); |
| |
| Object key = builtinKeys ? Integer.valueOf(-1) : new PersonKey(-1); |
| |
| // Test put-insert. |
| c1.put(key, val); |
| |
| rs = stmt.executeQuery(); |
| |
| assertTrue("Unexpected empty result set", rs.next()); |
| |
| assertEquals(-1, rs.getInt(1)); |
| assertEquals(-2, rs.getInt(2)); |
| assertEquals(testDate, rs.getDate(3)); |
| assertEquals("Person-to-test-put-insert", rs.getString(4)); |
| |
| assertEquals(testGender.toString(), |
| binaryMarshaller ? Gender.values()[rs.getInt(5)].toString(): rs.getString(5)); |
| |
| assertFalse("Unexpected more data in result set", rs.next()); |
| |
| U.closeQuiet(rs); |
| |
| // Test put-update. |
| testDate = Date.valueOf("2016-04-04"); |
| |
| c1.put(key, new Person(-1, -3, testDate, "Person-to-test-put-update", 999, testGender)); |
| |
| rs = stmt.executeQuery(); |
| |
| assertTrue("Unexpected empty result set", rs.next()); |
| |
| assertEquals(-1, rs.getInt(1)); |
| assertEquals(-3, rs.getInt(2)); |
| assertEquals(testDate, rs.getDate(3)); |
| assertEquals("Person-to-test-put-update", rs.getString(4)); |
| |
| assertEquals(testGender.toString(), |
| binaryMarshaller ? Gender.values()[rs.getInt(5)].toString(): rs.getString(5)); |
| |
| assertFalse("Unexpected more data in result set", rs.next()); |
| |
| // Test remove. |
| c1.remove(key); |
| |
| rs = stmt.executeQuery(); |
| |
| assertFalse("Unexpected non-empty result set", rs.next()); |
| |
| U.closeQuiet(rs); |
| } |
| finally { |
| U.closeQuiet(conn); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testPutRemoveBuiltIn() throws Exception { |
| startTestGrid(true, false, false, false, 512); |
| |
| checkPutRemove(); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testPutRemove() throws Exception { |
| startTestGrid(false, false, false, false, 512); |
| |
| checkPutRemove(); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testPutRemoveTxBuiltIn() throws Exception { |
| startTestGrid(true, false, false, true, 512); |
| |
| checkPutRemove(); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testPutRemoveTx() throws Exception { |
| startTestGrid(false, false, false, true, 512); |
| |
| checkPutRemove(); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testLoadNotRegisteredType() throws Exception { |
| startTestGrid(false, false, false, false, 512); |
| |
| IgniteCache<Object, Object> c1 = grid().cache(CACHE_NAME); |
| |
| try { |
| checkFetchSize = true; |
| |
| c1.loadCache(null, "PersonKeyWrong", "SELECT * FROM Person"); |
| } |
| catch (CacheLoaderException e) { |
| String msg = e.getMessage(); |
| |
| assertTrue("Unexpected exception: " + msg, |
| ("Provided key type is not found in store or cache configuration " + |
| "[cache=" + CACHE_NAME + ", key=PersonKeyWrong]").equals(msg)); |
| } finally { |
| checkFetchSize = false; |
| } |
| } |
| } |