| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.cache.store.jdbc; |
| |
| import java.io.ByteArrayInputStream; |
| import java.lang.reflect.Field; |
| import java.sql.Connection; |
| import java.sql.PreparedStatement; |
| import java.sql.SQLException; |
| import java.sql.Statement; |
| import java.sql.Timestamp; |
| import java.sql.Types; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import javax.cache.integration.CacheWriterException; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.binary.BinaryObject; |
| import org.apache.ignite.binary.BinaryObjectBuilder; |
| import org.apache.ignite.cache.store.jdbc.dialect.H2Dialect; |
| import org.apache.ignite.cache.store.jdbc.model.BinaryTest; |
| import org.apache.ignite.cache.store.jdbc.model.BinaryTestKey; |
| import org.apache.ignite.cache.store.jdbc.model.Organization; |
| import org.apache.ignite.cache.store.jdbc.model.OrganizationKey; |
| import org.apache.ignite.cache.store.jdbc.model.Person; |
| import org.apache.ignite.cache.store.jdbc.model.PersonComplexKey; |
| import org.apache.ignite.cache.store.jdbc.model.PersonKey; |
| import org.apache.ignite.internal.binary.BinaryMarshaller; |
| import org.apache.ignite.internal.processors.cache.CacheEntryImpl; |
| import org.apache.ignite.internal.util.typedef.CI2; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteBiInClosure; |
| import org.apache.ignite.testframework.GridTestUtils; |
| import org.apache.ignite.testframework.junits.cache.GridAbstractCacheStoreSelfTest; |
| import org.h2.jdbcx.JdbcConnectionPool; |
| import org.junit.Test; |
| |
| /** |
| * Class for {@code PojoCacheStore} tests. |
| */ |
| public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<CacheJdbcPojoStore<Object, Object>> { |
| /** DB connection URL. */ |
| private static final String DFLT_CONN_URL = "jdbc:h2:mem:autoCacheStore;DB_CLOSE_DELAY=-1"; |
| |
| /** Organization count. */ |
| protected static final int ORGANIZATION_CNT = 1000; |
| |
| /** Person count. */ |
| protected static final int PERSON_CNT = 100000; |
| |
| /** Ignite. */ |
| private Ignite ig; |
| |
| /** Binary enable. */ |
| private boolean binaryEnable; |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| public CacheJdbcPojoStoreTest() throws Exception { |
| // No-op. |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected CacheJdbcPojoStore<Object, Object> store() { |
| CacheJdbcPojoStoreFactory<Object, Object> storeFactory = new CacheJdbcPojoStoreFactory<>(); |
| |
| JdbcType[] storeTypes = new JdbcType[7]; |
| |
| storeTypes[0] = new JdbcType(); |
| storeTypes[0].setDatabaseSchema("PUBLIC"); |
| storeTypes[0].setDatabaseTable("ORGANIZATION"); |
| storeTypes[0].setKeyType("org.apache.ignite.cache.store.jdbc.model.OrganizationKey"); |
| storeTypes[0].setKeyFields(new JdbcTypeField(Types.INTEGER, "ID", Integer.class, "id")); |
| |
| storeTypes[0].setValueType("org.apache.ignite.cache.store.jdbc.model.Organization"); |
| storeTypes[0].setValueFields( |
| new JdbcTypeField(Types.INTEGER, "ID", Integer.class, "id"), |
| new JdbcTypeField(Types.VARCHAR, "NAME", String.class, "name"), |
| new JdbcTypeField(Types.VARCHAR, "CITY", String.class, "city")); |
| |
| storeTypes[1] = new JdbcType(); |
| storeTypes[1].setDatabaseSchema("PUBLIC"); |
| storeTypes[1].setDatabaseTable("PERSON"); |
| storeTypes[1].setKeyType("org.apache.ignite.cache.store.jdbc.model.PersonKey"); |
| storeTypes[1].setKeyFields(new JdbcTypeField(Types.INTEGER, "ID", Integer.class, "id")); |
| |
| storeTypes[1].setValueType("org.apache.ignite.cache.store.jdbc.model.Person"); |
| storeTypes[1].setValueFields( |
| new JdbcTypeField(Types.INTEGER, "ID", Integer.class, "id"), |
| new JdbcTypeField(Types.INTEGER, "ORG_ID", Integer.class, "orgId"), |
| new JdbcTypeField(Types.VARCHAR, "NAME", String.class, "name")); |
| |
| storeTypes[2] = new JdbcType(); |
| storeTypes[2].setDatabaseSchema("PUBLIC"); |
| storeTypes[2].setDatabaseTable("PERSON_COMPLEX"); |
| storeTypes[2].setKeyType("org.apache.ignite.cache.store.jdbc.model.PersonComplexKey"); |
| storeTypes[2].setKeyFields( |
| new JdbcTypeField(Types.INTEGER, "ID", int.class, "id"), |
| new JdbcTypeField(Types.INTEGER, "ORG_ID", int.class, "orgId"), |
| new JdbcTypeField(Types.INTEGER, "CITY_ID", int.class, "cityId")); |
| |
| storeTypes[2].setValueType("org.apache.ignite.cache.store.jdbc.model.Person"); |
| storeTypes[2].setValueFields( |
| new JdbcTypeField(Types.INTEGER, "ID", Integer.class, "id"), |
| new JdbcTypeField(Types.INTEGER, "ORG_ID", Integer.class, "orgId"), |
| new JdbcTypeField(Types.VARCHAR, "NAME", String.class, "name"), |
| new JdbcTypeField(Types.INTEGER, "SALARY", Integer.class, "salary")); |
| |
| storeTypes[3] = new JdbcType(); |
| storeTypes[3].setDatabaseSchema("PUBLIC"); |
| storeTypes[3].setDatabaseTable("TIMESTAMP_ENTRIES"); |
| storeTypes[3].setKeyType("java.sql.Timestamp"); |
| storeTypes[3].setKeyFields(new JdbcTypeField(Types.TIMESTAMP, "KEY", Timestamp.class, null)); |
| |
| storeTypes[3].setValueType("java.lang.Integer"); |
| storeTypes[3].setValueFields(new JdbcTypeField(Types.INTEGER, "VAL", Integer.class, null)); |
| |
| storeTypes[4] = new JdbcType(); |
| storeTypes[4].setDatabaseSchema("PUBLIC"); |
| storeTypes[4].setDatabaseTable("STRING_ENTRIES"); |
| storeTypes[4].setKeyType("java.lang.String"); |
| storeTypes[4].setKeyFields(new JdbcTypeField(Types.VARCHAR, "KEY", String.class, null)); |
| |
| storeTypes[4].setValueType("java.lang.String"); |
| storeTypes[4].setValueFields(new JdbcTypeField(Types.VARCHAR, "VAL", Integer.class, null)); |
| |
| storeTypes[5] = new JdbcType(); |
| storeTypes[5].setDatabaseSchema("PUBLIC"); |
| storeTypes[5].setDatabaseTable("UUID_ENTRIES"); |
| storeTypes[5].setKeyType("java.util.UUID"); |
| storeTypes[5].setKeyFields(new JdbcTypeField(Types.BINARY, "KEY", UUID.class, null)); |
| |
| storeTypes[5].setValueType("java.util.UUID"); |
| storeTypes[5].setValueFields(new JdbcTypeField(Types.BINARY, "VAL", UUID.class, null)); |
| |
| storeTypes[6] = new JdbcType(); |
| storeTypes[6].setDatabaseSchema("PUBLIC"); |
| storeTypes[6].setDatabaseTable("BINARY_ENTRIES"); |
| storeTypes[6].setKeyType("org.apache.ignite.cache.store.jdbc.model.BinaryTestKey"); |
| storeTypes[6].setKeyFields(new JdbcTypeField(Types.BINARY, "KEY", Integer.class, "id")); |
| |
| storeTypes[6].setValueType("org.apache.ignite.cache.store.jdbc.model.BinaryTest"); |
| storeTypes[6].setValueFields(new JdbcTypeField(Types.BINARY, "VAL", byte[].class, "bytes")); |
| |
| storeFactory.setTypes(storeTypes); |
| |
| storeFactory.setDialect(new H2Dialect()); |
| |
| CacheJdbcPojoStore<Object, Object> store = storeFactory.create(); |
| |
| // H2 DataSource |
| store.setDataSource(JdbcConnectionPool.create(DFLT_CONN_URL, "sa", "")); |
| |
| return store; |
| } |
| |
| /** |
| * @param store Store. |
| * @throws Exception If failed. |
| */ |
| @Override protected void inject(CacheJdbcPojoStore<Object, Object> store) throws Exception { |
| getTestResources().inject(store); |
| |
| GridTestUtils.setFieldValue(store, CacheAbstractJdbcStore.class, "ses", ses); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void beforeTest() throws Exception { |
| Connection conn = store.openConnection(false); |
| |
| Statement stmt = conn.createStatement(); |
| |
| try { |
| stmt.executeUpdate("delete from String_Entries"); |
| } |
| catch (SQLException ignore) { |
| // No-op. |
| } |
| |
| try { |
| stmt.executeUpdate("delete from UUID_Entries"); |
| } |
| catch (SQLException ignore) { |
| // No-op. |
| } |
| |
| try { |
| stmt.executeUpdate("delete from Organization"); |
| } |
| catch (SQLException ignore) { |
| // No-op. |
| } |
| |
| try { |
| stmt.executeUpdate("delete from Person"); |
| } |
| catch (SQLException ignore) { |
| // No-op. |
| } |
| |
| try { |
| stmt.executeUpdate("delete from Timestamp_Entries"); |
| } |
| catch (SQLException ignore) { |
| // No-op. |
| } |
| |
| try { |
| stmt.executeUpdate("delete from Binary_Entries"); |
| } |
| catch (SQLException ignore) { |
| // No-op. |
| } |
| |
| stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + |
| "String_Entries (key varchar(100) not null, val varchar(100), PRIMARY KEY(key))"); |
| |
| stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + |
| "UUID_Entries (key binary(16) not null, val binary(16), PRIMARY KEY(key))"); |
| |
| stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + |
| "Binary_Entries (key binary(16) not null, val binary(16), PRIMARY KEY(key))"); |
| |
| stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + |
| "Timestamp_Entries (key timestamp not null, val integer, PRIMARY KEY(key))"); |
| |
| stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + |
| "Organization (id integer not null, name varchar(50), city varchar(50), PRIMARY KEY(id))"); |
| |
| stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + |
| "Person (id integer not null, org_id integer, name varchar(50), PRIMARY KEY(id))"); |
| |
| stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + |
| "Person_Complex (id integer not null, org_id integer not null, city_id integer not null, " + |
| "name varchar(50), salary integer, PRIMARY KEY(id, org_id, city_id))"); |
| |
| conn.commit(); |
| |
| U.closeQuiet(stmt); |
| |
| U.closeQuiet(conn); |
| |
| super.beforeTest(); |
| |
| Ignite ig = U.field(store, "ignite"); |
| |
| this.ig = ig; |
| |
| binaryEnable = ig.configuration().getMarshaller() instanceof BinaryMarshaller; |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testLoadCache() throws Exception { |
| Connection conn = store.openConnection(false); |
| |
| PreparedStatement orgStmt = conn.prepareStatement("INSERT INTO Organization(id, name, city) VALUES (?, ?, ?)"); |
| |
| for (int i = 0; i < ORGANIZATION_CNT; i++) { |
| orgStmt.setInt(1, i); |
| orgStmt.setString(2, "name" + i); |
| orgStmt.setString(3, "city" + i % 10); |
| |
| orgStmt.addBatch(); |
| } |
| |
| orgStmt.executeBatch(); |
| |
| U.closeQuiet(orgStmt); |
| |
| conn.commit(); |
| |
| PreparedStatement prnStmt = conn.prepareStatement("INSERT INTO Person(id, org_id, name) VALUES (?, ?, ?)"); |
| |
| for (int i = 0; i < PERSON_CNT; i++) { |
| prnStmt.setInt(1, i); |
| prnStmt.setInt(2, i % 100); |
| prnStmt.setString(3, "name" + i); |
| |
| prnStmt.addBatch(); |
| } |
| |
| prnStmt.executeBatch(); |
| |
| conn.commit(); |
| |
| U.closeQuiet(prnStmt); |
| |
| PreparedStatement prnComplexStmt = conn.prepareStatement("INSERT INTO Person_Complex(id, org_id, city_id, name, salary) VALUES (?, ?, ?, ?, ?)"); |
| |
| for (int i = 0; i < PERSON_CNT; i++) { |
| prnComplexStmt.setInt(1, i); |
| prnComplexStmt.setInt(2, i % 500); |
| prnComplexStmt.setInt(3, i % 100); |
| prnComplexStmt.setString(4, "name" + i); |
| |
| if (i > 0) |
| prnComplexStmt.setInt(5, 1000 + i * 500); |
| else // Add person with null salary |
| prnComplexStmt.setNull(5, Types.INTEGER); |
| |
| prnComplexStmt.addBatch(); |
| } |
| |
| prnComplexStmt.executeBatch(); |
| |
| U.closeQuiet(prnComplexStmt); |
| |
| conn.commit(); |
| |
| U.closeQuiet(prnStmt); |
| |
| PreparedStatement binaryStmt = conn.prepareStatement("INSERT INTO Binary_Entries(key, val) VALUES (?, ?)"); |
| |
| byte[] bytes = new byte[16]; |
| |
| for (byte i = 0; i < 16; i++) |
| bytes[i] = i; |
| |
| binaryStmt.setInt(1, 1); |
| binaryStmt.setBinaryStream(2, new ByteArrayInputStream(bytes)); |
| |
| binaryStmt.addBatch(); |
| binaryStmt.executeBatch(); |
| |
| U.closeQuiet(binaryStmt); |
| |
| conn.commit(); |
| |
| U.closeQuiet(conn); |
| |
| final Collection<Object> orgKeys = new ConcurrentLinkedQueue<>(); |
| final Collection<Object> prnKeys = new ConcurrentLinkedQueue<>(); |
| final Collection<Object> prnComplexKeys = new ConcurrentLinkedQueue<>(); |
| final Collection<Object> binaryTestVals = new ConcurrentLinkedQueue<>(); |
| |
| IgniteBiInClosure<Object, Object> c = new CI2<Object, Object>() { |
| @Override public void apply(Object k, Object v) { |
| if (binaryEnable){ |
| if (k instanceof BinaryObject && v instanceof BinaryObject) { |
| BinaryObject key = (BinaryObject)k; |
| BinaryObject val = (BinaryObject)v; |
| |
| String keyType = key.type().typeName(); |
| String valType = val.type().typeName(); |
| |
| if (OrganizationKey.class.getName().equals(keyType) |
| && Organization.class.getName().equals(valType)) |
| orgKeys.add(key); |
| |
| if (PersonKey.class.getName().equals(keyType) |
| && Person.class.getName().equals(valType)) |
| prnKeys.add(key); |
| |
| if (PersonComplexKey.class.getName().equals(keyType) |
| && Person.class.getName().equals(valType)) |
| prnComplexKeys.add(key); |
| |
| if (BinaryTestKey.class.getName().equals(keyType) |
| && BinaryTest.class.getName().equals(valType)) |
| binaryTestVals.add(val.field("bytes")); |
| } |
| }else { |
| if (k instanceof OrganizationKey && v instanceof Organization) |
| orgKeys.add(k); |
| else if (k instanceof PersonKey && v instanceof Person) |
| prnKeys.add(k); |
| else if (k instanceof BinaryTestKey && v instanceof BinaryTest) |
| binaryTestVals.add(((BinaryTest)v).getBytes()); |
| else if (k instanceof PersonComplexKey && v instanceof Person) { |
| PersonComplexKey key = (PersonComplexKey)k; |
| |
| Person val = (Person)v; |
| |
| assertTrue("Key ID should be the same as value ID", key.getId() == val.getId()); |
| assertTrue("Key orgID should be the same as value orgID", key.getOrgId() == val.getOrgId()); |
| assertEquals("name" + key.getId(), val.getName()); |
| |
| prnComplexKeys.add(k); |
| } |
| } |
| } |
| }; |
| |
| store.loadCache(c); |
| |
| assertEquals(ORGANIZATION_CNT, orgKeys.size()); |
| assertEquals(PERSON_CNT, prnKeys.size()); |
| assertEquals(PERSON_CNT, prnComplexKeys.size()); |
| assertEquals(1, binaryTestVals.size()); |
| assertTrue(Arrays.equals(bytes, (byte[])binaryTestVals.iterator().next())); |
| |
| Collection<Object> tmpOrgKeys = new ArrayList<>(orgKeys); |
| Collection<Object> tmpPrnKeys = new ArrayList<>(prnKeys); |
| Collection<Object> tmpPrnComplexKeys = new ArrayList<>(prnComplexKeys); |
| |
| orgKeys.clear(); |
| prnKeys.clear(); |
| prnComplexKeys.clear(); |
| |
| store.loadCache( |
| c, OrganizationKey.class.getName(), "SELECT name, city, id FROM ORGANIZATION", |
| PersonKey.class.getName(), "SELECT org_id, id, name FROM Person WHERE id < 1000"); |
| |
| assertEquals(ORGANIZATION_CNT, orgKeys.size()); |
| assertEquals(1000, prnKeys.size()); |
| assertEquals(0, prnComplexKeys.size()); |
| |
| store.deleteAll(tmpOrgKeys); |
| store.deleteAll(tmpPrnKeys); |
| store.deleteAll(tmpPrnComplexKeys); |
| |
| orgKeys.clear(); |
| prnKeys.clear(); |
| prnComplexKeys.clear(); |
| |
| store.loadCache(c); |
| |
| assertTrue(orgKeys.isEmpty()); |
| assertTrue(prnKeys.isEmpty()); |
| assertTrue(prnComplexKeys.isEmpty()); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testParallelLoad() throws Exception { |
| Connection conn = store.openConnection(false); |
| |
| PreparedStatement prnComplexStmt = conn.prepareStatement("INSERT INTO Person_Complex(id, org_id, city_id, name, salary) VALUES (?, ?, ?, ?, ?)"); |
| |
| for (int i = 0; i < 8; i++) { |
| |
| prnComplexStmt.setInt(1, (i >> 2) & 1); |
| prnComplexStmt.setInt(2, (i >> 1) & 1); |
| prnComplexStmt.setInt(3, i % 2); |
| |
| prnComplexStmt.setString(4, "name"); |
| prnComplexStmt.setInt(5, 1000 + i * 500); |
| |
| prnComplexStmt.addBatch(); |
| } |
| |
| prnComplexStmt.executeBatch(); |
| |
| U.closeQuiet(prnComplexStmt); |
| |
| conn.commit(); |
| |
| U.closeQuiet(conn); |
| |
| final Collection<Object> prnComplexKeys = new ConcurrentLinkedQueue<>(); |
| |
| IgniteBiInClosure<Object, Object> c = new CI2<Object, Object>() { |
| @Override public void apply(Object k, Object v) { |
| if (binaryEnable) { |
| if (k instanceof BinaryObject && v instanceof BinaryObject) { |
| BinaryObject key = (BinaryObject)k; |
| BinaryObject val = (BinaryObject)v; |
| |
| String keyType = key.type().typeName(); |
| String valType = val.type().typeName(); |
| |
| if (PersonComplexKey.class.getName().equals(keyType) |
| && Person.class.getName().equals(valType)) |
| prnComplexKeys.add(key); |
| } |
| } |
| else { |
| if (k instanceof PersonComplexKey && v instanceof Person) |
| prnComplexKeys.add(k); |
| else |
| fail("Unexpected entry [key=" + k + ", value=" + v + "]"); |
| } |
| } |
| }; |
| |
| store.setParallelLoadCacheMinimumThreshold(2); |
| |
| store.loadCache(c); |
| |
| assertEquals(8, prnComplexKeys.size()); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testWriteRetry() throws Exception { |
| CacheJdbcPojoStore<Object, Object> store = store(); |
| |
| // Special dialect that will skip updates, to test write retry. |
| store.setDialect(new H2Dialect() { |
| /** {@inheritDoc} */ |
| @Override public boolean hasMerge() { |
| return false; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String updateQuery(String tblName, Collection<String> keyCols, |
| Iterable<String> valCols) { |
| return super.updateQuery(tblName, keyCols, valCols) + " AND 1 = 0"; |
| } |
| }); |
| |
| inject(store); |
| |
| Connection conn = store.openConnection(false); |
| |
| PreparedStatement orgStmt = conn.prepareStatement("INSERT INTO Organization(id, name, city) VALUES (?, ?, ?)"); |
| |
| orgStmt.setInt(1, 1); |
| orgStmt.setString(2, "name" + 1); |
| orgStmt.setString(3, "city" + 1); |
| |
| orgStmt.executeUpdate(); |
| |
| U.closeQuiet(orgStmt); |
| |
| conn.commit(); |
| |
| OrganizationKey k1 = new OrganizationKey(1); |
| Organization v1 = new Organization(1, "Name1", "City1"); |
| |
| ses.newSession(null); |
| |
| try { |
| store.write(new CacheEntryImpl<>(wrap(k1), wrap(v1))); |
| |
| fail("CacheWriterException wasn't thrown."); |
| } |
| catch (CacheWriterException e) { |
| if (!e.getMessage().startsWith("Failed insert entry in database, violate a unique index or primary key") || |
| e.getSuppressed().length != 2) |
| throw e; |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testTimestamp() throws Exception { |
| Timestamp k = new Timestamp(System.currentTimeMillis()); |
| |
| ses.newSession(null); |
| |
| Integer v = 5; |
| |
| store.write(new CacheEntryImpl<>(k, v)); |
| |
| assertEquals(v, store.load(k)); |
| |
| store.delete(k); |
| |
| assertNull(store.load(k)); |
| } |
| |
| /** |
| * @param obj Object. |
| */ |
| private Object wrap(Object obj) throws IllegalAccessException { |
| if (binaryEnable) { |
| Class<?> cls = obj.getClass(); |
| |
| BinaryObjectBuilder builder = ig.binary().builder(cls.getName()); |
| |
| for (Field f : cls.getDeclaredFields()) { |
| if (f.getName().contains("serialVersionUID")) |
| continue; |
| |
| f.setAccessible(true); |
| |
| builder.setField(f.getName(), f.get(obj)); |
| } |
| |
| return builder.build(); |
| } |
| |
| return obj; |
| } |
| } |