blob: 067ea3942b9b9792c75e7af06f1630a9f0005cfa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.jdbc.thin;
import java.sql.BatchUpdateException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
/**
* Test to check various transactional scenarios.
*/
public abstract class JdbcThinTransactionsAbstractComplexSelfTest extends JdbcThinAbstractSelfTest {
/** Client node index. */
static final int CLI_IDX = 1;
/**
* Closure to perform ordinary delete after repeatable read.
*/
private final IgniteInClosure<Connection> afterReadDel = new IgniteInClosure<Connection>() {
@Override public void apply(Connection conn) {
execute(conn, "DELETE FROM \"Person\".Person where firstname = 'John'");
}
};
/**
* Closure to perform fast delete after repeatable read.
*/
private final IgniteInClosure<Connection> afterReadFastDel = new IgniteInClosure<Connection>() {
@Override public void apply(Connection conn) {
execute(conn, "DELETE FROM \"Person\".Person where id = 1");
}
};
/**
* Closure to perform ordinary update after repeatable read.
*/
private final IgniteInClosure<Connection> afterReadUpdate = new IgniteInClosure<Connection>() {
@Override public void apply(Connection conn) {
execute(conn, "UPDATE \"Person\".Person set firstname = 'Joe' where firstname = 'John'");
}
};
/**
* Closure to perform ordinary delete and rollback after repeatable read.
*/
private final IgniteInClosure<Connection> afterReadDelAndRollback = new IgniteInClosure<Connection>() {
@Override public void apply(Connection conn) {
execute(conn, "DELETE FROM \"Person\".Person where firstname = 'John'");
rollback(conn);
}
};
/**
* Closure to perform fast delete after repeatable read.
*/
private final IgniteInClosure<Connection> afterReadFastDelAndRollback = new IgniteInClosure<Connection>() {
@Override public void apply(Connection conn) {
execute(conn, "DELETE FROM \"Person\".Person where id = 1");
rollback(conn);
}
};
/**
* Closure to perform ordinary update and rollback after repeatable read.
*/
private final IgniteInClosure<Connection> afterReadUpdateAndRollback = new IgniteInClosure<Connection>() {
@Override public void apply(Connection conn) {
execute(conn, "UPDATE \"Person\".Person set firstname = 'Joe' where firstname = 'John'");
rollback(conn);
}
};
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String testIgniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(testIgniteInstanceName);
CacheConfiguration<Integer, Person> ccfg = new CacheConfiguration<>("Person");
ccfg.setIndexedTypes(Integer.class, Person.class);
ccfg.getQueryEntities().iterator().next().setKeyFieldName("id");
ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT);
ccfg.setCacheMode(CacheMode.PARTITIONED);
cfg.setCacheConfiguration(ccfg);
// Let the node with index 1 be client node.
cfg.setClientMode(F.eq(testIgniteInstanceName, getTestIgniteInstanceName(CLI_IDX)));
return cfg;
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
execute("ALTER TABLE \"Person\".person add if not exists cityid int");
execute("ALTER TABLE \"Person\".person add if not exists companyid int");
execute("CREATE TABLE City (id int primary key, name varchar, population int) WITH " +
"\"atomicity=transactional_snapshot,template=partitioned,backups=3,cache_name=City\"");
execute("CREATE TABLE Company (id int, \"cityid\" int, name varchar, primary key (id, \"cityid\")) WITH " +
"\"atomicity=transactional_snapshot,template=partitioned,backups=1,wrap_value=false,affinity_key=cityid," +
"cache_name=Company\"");
execute("CREATE TABLE Product (id int primary key, name varchar, companyid int) WITH " +
"\"atomicity=transactional_snapshot,template=partitioned,backups=2,cache_name=Product\"");
execute("CREATE INDEX IF NOT EXISTS prodidx ON Product(companyid)");
execute("CREATE INDEX IF NOT EXISTS persidx ON \"Person\".person(cityid)");
insertPerson(1, "John", "Smith", 1, 1);
insertPerson(2, "Mike", "Johns", 1, 2);
insertPerson(3, "Sam", "Jules", 2, 2);
insertPerson(4, "Alex", "Pope", 2, 3);
insertPerson(5, "Peter", "Williams", 2, 3);
insertCity(1, "Los Angeles", 5000);
insertCity(2, "Seattle", 1500);
insertCity(3, "New York", 12000);
insertCity(4, "Cupertino", 400);
insertCompany(1, "Microsoft", 2);
insertCompany(2, "Google", 3);
insertCompany(3, "Facebook", 1);
insertCompany(4, "Uber", 1);
insertCompany(5, "Apple", 4);
insertProduct(1, "Search", 2);
insertProduct(2, "Windows", 1);
insertProduct(3, "Mac", 5);
awaitPartitionMapExchange();
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
startGridsMultiThreaded(4);
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
execute("DELETE FROM \"Person\".Person");
execute("DROP TABLE City");
execute("DROP TABLE Company");
execute("DROP TABLE Product");
super.afterTest();
}
/**
*
*/
@Test
public void testSingleDmlStatement() throws SQLException {
insertPerson(6, "John", "Doe", 2, 2);
assertEquals(Collections.singletonList(l(6, "John", "Doe", 2, 2)),
execute("SELECT * FROM \"Person\".Person where id = 6"));
}
/**
*
*/
@Test
public void testMultipleDmlStatements() throws SQLException {
executeInTransaction(new TransactionClosure() {
@Override public void apply(Connection conn) {
insertPerson(conn, 6, "John", "Doe", 2, 2);
// https://issues.apache.org/jira/browse/IGNITE-6938 - we can only see results of
// UPDATE of what we have not inserted ourselves.
execute(conn, "UPDATE \"Person\".person SET lastname = 'Jameson' where lastname = 'Jules'");
execute(conn, "DELETE FROM \"Person\".person where id = 5");
}
});
assertEquals(l(
l(3, "Sam", "Jameson", 2, 2),
l(6, "John", "Doe", 2, 2)
), execute("SELECT * FROM \"Person\".Person where id = 3 or id >= 5 order by id"));
}
/**
*
*/
@Test
public void testBatchDmlStatements() throws SQLException {
doBatchedInsert();
assertEquals(l(
l(6, "John", "Doe", 2, 2),
l(7, "Mary", "Lee", 1, 3)
), execute("SELECT * FROM \"Person\".Person where id > 5 order by id"));
}
/**
*
*/
@Test
public void testBatchDmlStatementsIntermediateFailure() throws SQLException {
insertPerson(6, "John", "Doe", 2, 2);
IgniteException e = (IgniteException)GridTestUtils.assertThrows(null, new Callable<Object>() {
@Override public Object call() throws Exception {
doBatchedInsert();
return null;
}
}, IgniteException.class, "Duplicate key during INSERT [key=KeyCacheObjectImpl " +
"[part=6, val=6, hasValBytes=true]]");
assertTrue(e.getCause() instanceof BatchUpdateException);
assertEquals(IgniteQueryErrorCode.DUPLICATE_KEY, ((BatchUpdateException)e.getCause()).getErrorCode());
assertTrue(e.getCause().getMessage().contains("Duplicate key during INSERT [key=KeyCacheObjectImpl " +
"[part=6, val=6, hasValBytes=true]]"));
// First we insert id 7, then 6. Still, 7 is not in the cache as long as the whole batch has failed inside tx.
assertEquals(Collections.emptyList(), execute("SELECT * FROM \"Person\".Person where id > 6 order by id"));
}
/**
*
*/
private void doBatchedInsert() throws SQLException {
executeInTransaction(new TransactionClosure() {
@Override public void apply(Connection conn) {
try {
try (PreparedStatement ps = conn.prepareStatement("INSERT INTO \"Person\".person " +
"(id, firstName, lastName, cityId, companyId) values (?, ?, ?, ?, ?)")) {
ps.setInt(1, 7);
ps.setString(2, "Mary");
ps.setString(3, "Lee");
ps.setInt(4, 1);
ps.setInt(5, 3);
ps.addBatch();
ps.setInt(1, 6);
ps.setString(2, "John");
ps.setString(3, "Doe");
ps.setInt(4, 2);
ps.setInt(5, 2);
ps.addBatch();
ps.executeBatch();
}
}
catch (SQLException e) {
throw new IgniteException(e);
}
}
});
}
/**
*
*/
@Test
public void testInsertAndQueryMultipleCaches() throws SQLException {
executeInTransaction(new TransactionClosure() {
@Override public void apply(Connection conn) {
insertCity(conn, 5, "St Petersburg", 6000);
insertCompany(conn, 6, "VK", 5);
insertPerson(conn, 6, "Peter", "Sergeev", 5, 6);
}
});
try (Connection c = connect("distributedJoins=true")) {
assertEquals(l(l(5, "St Petersburg", 6000, 6, 5, "VK", 6, "Peter", "Sergeev", 5, 6)),
execute(c, "SELECT * FROM City left join Company on City.id = Company.\"cityid\" " +
"left join \"Person\".Person p on City.id = p.cityid WHERE p.id = 6 or company.id = 6"));
}
}
/**
*
*/
@Test
public void testColocatedJoinSelectAndInsertInTransaction() throws SQLException {
// We'd like to put some Google into cities with over 1K population which don't have it yet
executeInTransaction(new TransactionClosure() {
@Override public void apply(Connection conn) {
List<Integer> ids = flat(execute(conn, "SELECT distinct City.id from City left join Company c on " +
"City.id = c.\"cityid\" where population >= 1000 and c.name <> 'Google' order by City.id"));
assertEqualsCollections(l(1, 2), ids);
int i = 5;
for (int l : ids)
insertCompany(conn, ++i, "Google", l);
}
});
assertEqualsCollections(l("Los Angeles", "Seattle", "New York"), flat(execute("SELECT City.name from City " +
"left join Company c on city.id = c.\"cityid\" WHERE c.name = 'Google' order by City.id")));
}
/**
*
*/
@Test
public void testDistributedJoinSelectAndInsertInTransaction() throws SQLException {
try (Connection c = connect("distributedJoins=true")) {
// We'd like to put some Google into cities with over 1K population which don't have it yet
executeInTransaction(c, new TransactionClosure() {
@Override public void apply(Connection conn) {
List<?> res = flat(execute(conn, "SELECT p.id,p.name,c.id from Company c left join Product p on " +
"c.id = p.companyid left join City on city.id = c.\"cityid\" WHERE c.name <> 'Microsoft' " +
"and population < 1000"));
assertEqualsCollections(l(3, "Mac", 5), res);
insertProduct(conn, 4, (String)res.get(1), 1);
}
});
}
try (Connection c = connect("distributedJoins=true")) {
assertEqualsCollections(l("Windows", "Mac"), flat(execute(c, "SELECT p.name from Company c left join " +
"Product p on c.id = p.companyid WHERE c.name = 'Microsoft' order by p.id")));
}
}
/**
*
*/
@Test
public void testInsertFromExpression() throws SQLException {
executeInTransaction(new TransactionClosure() {
@Override public void apply(Connection conn) {
execute(conn, "insert into city (id, name, population) values (? + 1, ?, ?)",
8, "Moscow", 15000);
}
});
}
/**
*
*/
@Test
public void testAutoRollback() throws SQLException {
try (Connection c = connect()) {
begin(c);
insertPerson(c, 6, "John", "Doe", 2, 2);
}
// Connection has not hung on close and update has not been applied.
assertTrue(personCache().query(new SqlFieldsQuery("SELECT * FROM \"Person\".Person WHERE id = 6"))
.getAll().isEmpty());
}
/**
*
*/
@Test
public void testRepeatableReadWithConcurrentDelete() throws Exception {
doTestRepeatableRead(new IgniteInClosure<Connection>() {
@Override public void apply(Connection conn) {
execute(conn, "DELETE FROM \"Person\".Person where firstname = 'John'");
}
}, null);
}
/**
*
*/
@Test
public void testRepeatableReadWithConcurrentFastDelete() throws Exception {
doTestRepeatableRead(new IgniteInClosure<Connection>() {
@Override public void apply(Connection conn) {
execute(conn, "DELETE FROM \"Person\".Person where id = 1");
}
}, null);
}
/**
*
*/
@Test
public void testRepeatableReadWithConcurrentCacheRemove() throws Exception {
doTestRepeatableRead(new IgniteInClosure<Connection>() {
@Override public void apply(Connection conn) {
personCache().remove(1);
}
}, null);
}
/**
*
*/
@Test
public void testRepeatableReadAndDeleteWithConcurrentDelete() throws Exception {
doTestRepeatableRead(new IgniteInClosure<Connection>() {
@Override public void apply(Connection conn) {
execute(conn, "DELETE FROM \"Person\".Person where firstname = 'John'");
}
}, afterReadDel);
}
/**
*
*/
@Test
public void testRepeatableReadAndDeleteWithConcurrentFastDelete() throws Exception {
doTestRepeatableRead(new IgniteInClosure<Connection>() {
@Override public void apply(Connection conn) {
execute(conn, "DELETE FROM \"Person\".Person where id = 1");
}
}, afterReadDel);
}
/**
*
*/
@Test
public void testRepeatableReadAndDeleteWithConcurrentCacheRemove() throws Exception {
doTestRepeatableRead(new IgniteInClosure<Connection>() {
@Override public void apply(Connection conn) {
personCache().remove(1);
}
}, afterReadDel);
}
/**
*
*/
@Test
public void testRepeatableReadAndFastDeleteWithConcurrentDelete() throws Exception {
doTestRepeatableRead(new IgniteInClosure<Connection>() {
@Override public void apply(Connection conn) {
execute(conn, "DELETE FROM \"Person\".Person where firstname = 'John'");
}
}, afterReadFastDel);
}
/**
*
*/
@Test
public void testRepeatableReadAndFastDeleteWithConcurrentFastDelete() throws Exception {
doTestRepeatableRead(new IgniteInClosure<Connection>() {
@Override public void apply(Connection conn) {
execute(conn, "DELETE FROM \"Person\".Person where id = 1");
}
}, afterReadFastDel);
}
/**
*
*/
@Test
public void testRepeatableReadAndFastDeleteWithConcurrentCacheRemove() throws Exception {
doTestRepeatableRead(new IgniteInClosure<Connection>() {
@Override public void apply(Connection conn) {
personCache().remove(1);
}
}, afterReadFastDel);
}
/**
*
*/
@Test
public void testRepeatableReadAndDeleteWithConcurrentDeleteAndRollback() throws Exception {
doTestRepeatableRead(new IgniteInClosure<Connection>() {
@Override public void apply(Connection conn) {
execute(conn, "DELETE FROM \"Person\".Person where firstname = 'John'");
}
}, afterReadDelAndRollback);
}
/**
*
*/
@Test
public void testRepeatableReadAndDeleteWithConcurrentFastDeleteAndRollback() throws Exception {
doTestRepeatableRead(new IgniteInClosure<Connection>() {
@Override public void apply(Connection conn) {
execute(conn, "DELETE FROM \"Person\".Person where id = 1");
}
}, afterReadDelAndRollback);
}
/**
*
*/
@Test
public void testRepeatableReadAndDeleteWithConcurrentCacheRemoveAndRollback() throws Exception {
doTestRepeatableRead(new IgniteInClosure<Connection>() {
@Override public void apply(Connection conn) {
personCache().remove(1);
}
}, afterReadDelAndRollback);
}
/**
*
*/
@Test
public void testRepeatableReadAndFastDeleteWithConcurrentDeleteAndRollback() throws Exception {
doTestRepeatableRead(new IgniteInClosure<Connection>() {
@Override public void apply(Connection conn) {
execute(conn, "DELETE FROM \"Person\".Person where firstname = 'John'");
}
}, afterReadFastDelAndRollback);
}
/**
*
*/
@Test
public void testRepeatableReadAndFastDeleteWithConcurrentFastDeleteAndRollback() throws Exception {
doTestRepeatableRead(new IgniteInClosure<Connection>() {
@Override public void apply(Connection conn) {
execute(conn, "DELETE FROM \"Person\".Person where id = 1");
}
}, afterReadFastDelAndRollback);
}
/**
*
*/
@Test
public void testRepeatableReadAndFastDeleteWithConcurrentCacheRemoveAndRollback() throws Exception {
doTestRepeatableRead(new IgniteInClosure<Connection>() {
@Override public void apply(Connection conn) {
personCache().remove(1);
}
}, afterReadFastDelAndRollback);
}
/**
*
*/
@Test
public void testRepeatableReadWithConcurrentUpdate() throws Exception {
doTestRepeatableRead(new IgniteInClosure<Connection>() {
@Override public void apply(Connection conn) {
execute(conn, "UPDATE \"Person\".Person SET lastname = 'Fix' where firstname = 'John'");
}
}, null);
}
/**
*
*/
@Test
public void testRepeatableReadWithConcurrentCacheReplace() throws Exception {
doTestRepeatableRead(new IgniteInClosure<Connection>() {
@Override public void apply(Connection conn) {
Person p = new Person();
p.id = 1;
p.firstName = "Luke";
p.lastName = "Maxwell";
personCache().replace(1, p);
}
}, null);
}
/**
*
*/
@Test
public void testRepeatableReadAndUpdateWithConcurrentUpdate() throws Exception {
doTestRepeatableRead(new IgniteInClosure<Connection>() {
@Override public void apply(Connection conn) {
execute(conn, "UPDATE \"Person\".Person SET lastname = 'Fix' where firstname = 'John'");
}
}, afterReadUpdate);
}
/**
*
*/
@Test
public void testRepeatableReadAndUpdateWithConcurrentCacheReplace() throws Exception {
doTestRepeatableRead(new IgniteInClosure<Connection>() {
@Override public void apply(Connection conn) {
Person p = new Person();
p.id = 1;
p.firstName = "Luke";
p.lastName = "Maxwell";
personCache().replace(1, p);
}
}, afterReadUpdate);
}
/**
*
*/
@Test
public void testRepeatableReadAndUpdateWithConcurrentUpdateAndRollback() throws Exception {
doTestRepeatableRead(new IgniteInClosure<Connection>() {
@Override public void apply(Connection conn) {
execute(conn, "UPDATE \"Person\".Person SET lastname = 'Fix' where firstname = 'John'");
}
}, afterReadUpdateAndRollback);
}
/**
*
*/
@Test
public void testRepeatableReadAndUpdateWithConcurrentCacheReplaceAndRollback() throws Exception {
doTestRepeatableRead(new IgniteInClosure<Connection>() {
@Override public void apply(Connection conn) {
Person p = new Person();
p.id = 1;
p.firstName = "Luke";
p.lastName = "Maxwell";
personCache().replace(1, p);
}
}, afterReadUpdateAndRollback);
}
/**
* Perform repeatable reads and concurrent changes.
* @param concurrentWriteClo Updating closure.
* @param afterReadClo Closure making write changes that should also be made inside repeatable read transaction
* (must yield an exception).
* @throws Exception if failed.
*/
private void doTestRepeatableRead(final IgniteInClosure<Connection> concurrentWriteClo,
final IgniteInClosure<Connection> afterReadClo) throws Exception {
final CountDownLatch repeatableReadLatch = new CountDownLatch(1);
final CountDownLatch initLatch = new CountDownLatch(1);
final IgniteInternalFuture<?> readFut = multithreadedAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
executeInTransaction(new TransactionClosure() {
@Override public void apply(Connection conn) {
List<?> before = flat(execute(conn, "SELECT * from \"Person\".Person where id = 1"));
assertEqualsCollections(l(1, "John", "Smith", 1, 1), before);
initLatch.countDown();
try {
U.await(repeatableReadLatch);
}
catch (IgniteInterruptedCheckedException e) {
throw new IgniteException(e);
}
List<?> after = flat(execute(conn, "SELECT * from \"Person\".Person where id = 1"));
assertEqualsCollections(before, after);
if (afterReadClo != null)
afterReadClo.apply(conn);
}
});
return null;
}
}, 1);
IgniteInternalFuture<?> conModFut = multithreadedAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
executeInTransaction(new TransactionClosure() {
@Override public void apply(Connection conn) {
try {
U.await(initLatch);
}
catch (IgniteInterruptedCheckedException e) {
throw new IgniteException(e);
}
concurrentWriteClo.apply(conn);
repeatableReadLatch.countDown();
}
});
return null;
}
}, 1);
conModFut.get();
if (afterReadClo != null) {
IgniteCheckedException ex = (IgniteCheckedException)GridTestUtils.assertThrows(null, new Callable() {
@Override public Object call() throws Exception {
readFut.get();
return null;
}
}, IgniteCheckedException.class, "Cannot serialize transaction due to write conflict");
assertTrue(X.hasCause(ex, SQLException.class));
assertTrue(X.getCause(ex).getMessage().contains("Cannot serialize transaction due to write conflict"));
}
else
readFut.get();
}
/**
* Create a new connection, a new transaction and run given closure in its scope.
* @param clo Closure.
* @throws SQLException if failed.
*/
private void executeInTransaction(TransactionClosure clo) throws SQLException {
try (Connection conn = connect()) {
executeInTransaction(conn, clo);
}
}
/**
* Create a new transaction and run given closure in its scope.
* @param conn Connection.
* @param clo Closure.
* @throws SQLException if failed.
*/
private void executeInTransaction(Connection conn, TransactionClosure clo) throws SQLException {
begin(conn);
clo.apply(conn);
commit(conn);
}
/**
* @return Auto commit strategy for this test.
*/
abstract boolean autoCommit();
/**
* @param c Connection to begin a transaction on.
*/
private void begin(Connection c) {
if (autoCommit())
execute(c, "BEGIN");
}
/**
* @param c Connection to begin a transaction on.
*/
private void commit(Connection c) throws SQLException {
if (autoCommit())
execute(c, "COMMIT");
else
c.commit();
}
/**
* @param c Connection to rollback a transaction on.
*/
private void rollback(Connection c) {
try {
if (autoCommit())
execute(c, "ROLLBACK");
else
c.rollback();
}
catch (SQLException e) {
throw new IgniteException(e);
}
}
/**
* @param sql Statement.
* @param args Arguments.
* @return Result set.
* @throws SQLException if failed.
*/
List<List<?>> execute(String sql, Object... args) throws SQLException {
try (Connection c = connect()) {
c.setAutoCommit(true);
return execute(c, sql, args);
}
}
/**
* @param sql Statement.
* @param args Arguments.
* @return Result set.
* @throws RuntimeException if failed.
*/
@Override protected List<List<?>> execute(Connection conn, String sql, Object... args) {
try {
return super.execute(conn, sql, args);
}
catch (SQLException e) {
throw new IgniteException(e);
}
}
/**
* @return New connection to default node.
* @throws SQLException if failed.
*/
private Connection connect() throws SQLException {
return connect(null);
}
/**
* @param params Connection parameters.
* @return New connection to default node.
* @throws SQLException if failed.
*/
private Connection connect(String params) throws SQLException {
Connection c = connect(node(), params);
c.setAutoCommit(false);
return c;
}
/**
* @param node Node to connect to.
* @param params Connection parameters.
* @return Thin JDBC connection to specified node.
*/
@Override protected Connection connect(IgniteEx node, String params) {
try {
return super.connect(node, params);
}
catch (SQLException e) {
throw new AssertionError(e);
}
}
/**
* @return Default node to fire queries from.
*/
private IgniteEx node() {
return grid(nodeIndex());
}
/**
* @return {@link Person} cache.
*/
private IgniteCache<Integer, Person> personCache() {
return node().cache("Person");
}
/**
* @return Node index to fire queries from.
*/
abstract int nodeIndex();
/**
* @param id New person's id.
* @param firstName First name.
* @param lastName Second name.
* @param cityId City id.
* @param companyId Company id.
* @throws SQLException if failed.
*/
private void insertPerson(final int id, final String firstName, final String lastName, final int cityId,
final int companyId) throws SQLException {
executeInTransaction(new TransactionClosure() {
@Override public void apply(Connection conn) {
insertPerson(conn, id, firstName, lastName, cityId, companyId);
}
});
}
/**
* @param c Connection.
* @param id New person's id.
* @param firstName First name.
* @param lastName Second name.
* @param cityId City id.
* @param companyId Company id.
*/
private void insertPerson(Connection c, int id, String firstName, String lastName, int cityId, int companyId) {
execute(c, "INSERT INTO \"Person\".person (id, firstName, lastName, cityId, companyId) values (?, ?, ?, ?, ?)",
id, firstName, lastName, cityId, companyId);
}
/**
* @param id New city's id.
* @param name City name.
* @param population Number of people.
* @throws SQLException if failed.
*/
private void insertCity(final int id, final String name, final int population) throws SQLException {
executeInTransaction(new TransactionClosure() {
@Override public void apply(Connection conn) {
insertCity(conn, id, name, population);
}
});
}
/**
* @param c Connection.
* @param id New city's id.
* @param name City name.
* @param population Number of people.
*/
private void insertCity(Connection c, int id, String name, int population) {
execute(c, "INSERT INTO city (id, name, population) values (?, ?, ?)", id, name, population);
}
/**
* @param id New company's id.
* @param name Company name.
* @param cityId City id.
* @throws SQLException if failed.
*/
private void insertCompany(final int id, final String name, final int cityId) throws SQLException {
executeInTransaction(new TransactionClosure() {
@Override public void apply(Connection conn) {
insertCompany(conn, id, name, cityId);
}
});
}
/**
* @param c Connection.
* @param id New company's id.
* @param name Company name.
* @param cityId City id.
*/
private void insertCompany(Connection c, int id, String name, int cityId) {
execute(c, "INSERT INTO company (id, name, \"cityid\") values (?, ?, ?)", id, name, cityId);
}
/**
* @param id New product's id.
* @param name Product name.
* @param companyId Company id..
* @throws SQLException if failed.
*/
private void insertProduct(final int id, final String name, final int companyId) throws SQLException {
executeInTransaction(new TransactionClosure() {
@Override public void apply(Connection conn) {
insertProduct(conn, id, name, companyId);
}
});
}
/**
* @param c Connection.
* @param id New product's id.
* @param name Product name.
* @param companyId Company id..
*/
private void insertProduct(Connection c, int id, String name, int companyId) {
execute(c, "INSERT INTO product (id, name, companyid) values (?, ?, ?)", id, name, companyId);
}
/**
* Person class.
*/
private static final class Person {
/** */
@QuerySqlField
public int id;
/** */
@QuerySqlField
public String firstName;
/** */
@QuerySqlField
public String lastName;
}
/**
* Closure to be executed in scope of a transaction.
*/
private abstract static class TransactionClosure implements IgniteInClosure<Connection> {
// No-op.
}
/**
* @return List of given arguments.
*/
private static List<?> l(Object... args) {
return F.asList(args);
}
/**
* Flatten rows.
* @param rows Rows.
* @return Rows as a single list.
*/
private static <T> List<T> flat(Collection<? extends Collection<?>> rows) {
return new ArrayList<>(F.flatCollections((Collection<? extends Collection<T>>)rows));
}
}