blob: 5455fd7d5ceb4e03f6578af46a1997d0cf193cb1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.cache.store.jdbc;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.store.jdbc.dialect.H2Dialect;
import org.apache.ignite.cache.store.jdbc.model.TestJdbcPojoDataSourceFactory;
import org.apache.ignite.cache.store.jdbc.model.TestJdbcPojoStoreFactoryWithHangWriteAll;
import org.apache.ignite.cache.store.jdbc.model.TestPojo;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE;
import static org.apache.ignite.configuration.DataPageEvictionMode.RANDOM_LRU;
import static org.apache.ignite.configuration.WALMode.LOG_ONLY;
/**
* Tests for {@link CacheJdbcPojoStore}.
*/
public class CacheJdbcPojoWriteBehindStoreWithCoalescingTest extends GridCommonAbstractTest {
/** */
private static final String DFLT_CONN_URL = "jdbc:h2:mem:TestDatabase;DB_CLOSE_DELAY=-1";
/** */
private boolean isHangOnWriteAll = false;
/** */
private boolean isSmallRegion = false;
/** */
private final AtomicBoolean testFailed = new AtomicBoolean(false);
/** {@inheritDoc} */
@Override protected long getTestTimeout() {
return 20 * 60 * 1000; //20 min
}
/** */
public DataStorageConfiguration getDataStorageConfiguration() {
DataStorageConfiguration memCfg = new DataStorageConfiguration();
DataRegionConfiguration plc = new DataRegionConfiguration();
plc.setName("Default_Region");
plc.setPageEvictionMode(RANDOM_LRU);
if (isSmallRegion)
plc.setMaxSize(128L * 1024 * 1024); // 128 MB
else
plc.setMaxSize(1L * 1024 * 1024 * 1024); // 1GB
memCfg.setDefaultDataRegionConfiguration(plc);
memCfg.setWalMode(LOG_ONLY);
return memCfg;
}
/** */
public TestJdbcPojoDataSourceFactory getDataSourceFactory() {
TestJdbcPojoDataSourceFactory testJdbcPojoDataSourceFactory = new TestJdbcPojoDataSourceFactory();
testJdbcPojoDataSourceFactory.setURL("jdbc:h2:mem:TestDatabase;DB_CLOSE_DELAY=-1");
testJdbcPojoDataSourceFactory.setUserName("sa");
testJdbcPojoDataSourceFactory.setPassword("");
return testJdbcPojoDataSourceFactory;
}
/** */
public JdbcType getJdbcType() {
JdbcType type = new JdbcType();
type.setCacheName("TEST_CACHE");
type.setKeyType(Integer.class);
type.setValueType(TestPojo.class);
type.setDatabaseSchema("PUBLIC");
type.setDatabaseTable("TEST_CACHE");
type.setKeyFields(new JdbcTypeField(java.sql.Types.INTEGER, "VALUE2", Integer.class, "value2"));
type.setValueFields(
new JdbcTypeField(java.sql.Types.VARCHAR, "VALUE1", String.class, "value1"),
new JdbcTypeField(java.sql.Types.DATE, "VALUE3", java.sql.Date.class, "value3")
);
return type;
}
/** */
public CacheJdbcPojoStoreFactory getStoreFactory() {
CacheJdbcPojoStoreFactory storeFactory = new CacheJdbcPojoStoreFactory();
storeFactory.setParallelLoadCacheMinimumThreshold(100);
storeFactory.setBatchSize(100);
storeFactory.setMaximumPoolSize(4);
storeFactory.setDataSourceFactory(getDataSourceFactory());
storeFactory.setDialect(new H2Dialect());
storeFactory.setTypes(getJdbcType());
return storeFactory;
}
/** */
public CacheJdbcPojoStoreFactory getStoreFactoryWithHangWriteAll() {
TestJdbcPojoStoreFactoryWithHangWriteAll storeFactory = new TestJdbcPojoStoreFactoryWithHangWriteAll();
storeFactory.setParallelLoadCacheMinimumThreshold(100);
storeFactory.setBatchSize(100);
storeFactory.setMaximumPoolSize(4);
storeFactory.setDataSourceFactory(getDataSourceFactory());
storeFactory.setDialect(new H2Dialect());
storeFactory.setTypes(getJdbcType());
return storeFactory;
}
/** */
public CacheConfiguration getCacheConfiguration() {
CacheConfiguration ccfg = new CacheConfiguration();
ccfg.setName("TEST_CACHE");
ccfg.setCacheMode(REPLICATED);
ccfg.setAtomicityMode(ATOMIC);
ccfg.setPartitionLossPolicy(READ_WRITE_SAFE);
ccfg.setReadThrough(true);
ccfg.setWriteThrough(true);
ccfg.setWriteBehindEnabled(true);
ccfg.setWriteBehindBatchSize(1000);
QueryEntity queryEntity = new QueryEntity();
queryEntity.setKeyType("java.lang.Integer");
queryEntity.setValueType("org.apache.ignite.cache.store.jdbc.model.TestPojo");
queryEntity.setTableName("TEST_CACHE");
queryEntity.setKeyFieldName("value3");
Set<String> keyFiles = new HashSet<>();
keyFiles.add("value3");
queryEntity.setKeyFields(keyFiles);
LinkedHashMap<String, String> fields = new LinkedHashMap<>();
fields.put("value1", "java.lang.String");
fields.put("value2", "java.lang.Integer");
fields.put("value3", "java.sql.Date");
queryEntity.setFields(fields);
Map<String, String> aliases = new HashMap<>();
aliases.put("value1", "VALUE1");
aliases.put("value2", "VALUE2");
aliases.put("value3", "VALUE3");
queryEntity.setAliases(aliases);
ArrayList<QueryEntity> queryEntities = new ArrayList<>();
queryEntities.add(queryEntity);
ccfg.setQueryEntities(queryEntities);
return ccfg;
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
//Data Storage
cfg.setDataStorageConfiguration(getDataStorageConfiguration());
//cache configuration
CacheConfiguration ccfg = getCacheConfiguration();
if (isHangOnWriteAll)
ccfg.setCacheStoreFactory(getStoreFactoryWithHangWriteAll());
else
ccfg.setCacheStoreFactory(getStoreFactory());
cfg.setCacheConfiguration(ccfg);
//discovery
TcpDiscoverySpi tcpDiscoverySpi = new TcpDiscoverySpi();
TcpDiscoveryVmIpFinder tcpDiscoveryVmIpFinder = new TcpDiscoveryVmIpFinder();
ArrayList<String> addrs = new ArrayList<>();
addrs.add("127.0.0.1:47500..47509");
tcpDiscoveryVmIpFinder.setAddresses(addrs);
tcpDiscoverySpi.setIpFinder(tcpDiscoveryVmIpFinder);
cfg.setDiscoverySpi(tcpDiscoverySpi);
return cfg;
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
testFailed.set(false);
cleanPersistenceDir();
try {
Connection conn = DriverManager.getConnection(DFLT_CONN_URL, "sa", "");
Statement stmt = conn.createStatement();
stmt.executeUpdate("DROP TABLE IF EXISTS TEST_CACHE");
stmt.executeUpdate("CREATE TABLE TEST_CACHE (" +
" VALUE2 INTEGER PRIMARY KEY," +
" VALUE1 VARCHAR(50)," +
" VALUE3 DATE" +
")"
);
conn.commit();
U.closeQuiet(stmt);
U.closeQuiet(conn);
} catch (SQLException ex) {
fail(ex.getMessage());
}
super.beforeTest();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
stopAllGrids();
cleanPersistenceDir();
}
/** */
public void checkCacheStore(IgniteCache<Integer, TestPojo> cache) {
try {
Connection conn = DriverManager.getConnection(DFLT_CONN_URL, "sa", "");
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(" SELECT * FROM TEST_CACHE");
int count = 0;
while (rs.next()) {
String value1 = rs.getString("VALUE1");
Integer value2 = rs.getInt("VALUE2");
java.sql.Date value3 = rs.getDate("VALUE3");
TestPojo pojo = cache.get(value2);
assertNotNull(pojo);
Calendar c1 = Calendar.getInstance();
c1.setTime(value3);
Calendar c2 = Calendar.getInstance();
c2.setTime(pojo.getValue3());
assertEquals(value1, pojo.getValue1());
assertEquals(value2, pojo.getValue2());
assertEquals(c1.get(Calendar.DAY_OF_YEAR), c2.get(Calendar.DAY_OF_YEAR));
assertEquals(c1.get(Calendar.YEAR), c2.get(Calendar.YEAR));
assertEquals(c1.get(Calendar.MONTH), c2.get(Calendar.MONTH));
count++;
}
assertEquals(count, cache.size());
U.closeQuiet(stmt);
U.closeQuiet(conn);
} catch (SQLException ex) {
fail();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testHangWriteAllWithCoalescing() throws Exception {
isHangOnWriteAll = true;
writeAllWithCoalescing();
}
/**
* @throws Exception If failed.
*/
@Test
public void testNormalWriteAllWithCoalescing() throws Exception {
isHangOnWriteAll = false;
writeAllWithCoalescing();
}
/**
* @throws Exception If failed.
*/
@Test
public void testReadWithCoalescingSmallRegionWithEviction() throws Exception {
isHangOnWriteAll = false;
isSmallRegion = true;
readWithCoalescing();
}
/**
* @throws Exception If failed.
*/
@Test
public void testReadWithCoalescingNormalRegion() throws Exception {
isHangOnWriteAll = false;
isSmallRegion = false;
readWithCoalescing();
}
/**
* @throws Exception If failed.
*/
@Test
public void testUpdateAndReadTheSameKeyWithCoalescing() throws Exception {
isHangOnWriteAll = false;
isSmallRegion = false;
updateAndReadWithCoalescingSameKey();
}
/**
* @throws Exception If failed.
*/
@Test
public void testUpdateAndReadTheSameKeyWithCoalescingHangWriteAll() throws Exception {
isHangOnWriteAll = true;
isSmallRegion = false;
updateAndReadWithCoalescingSameKey();
}
/**
* @throws Exception If failed.
*/
public void updateAndReadWithCoalescingSameKey() throws Exception {
Ignite ignite = startGrid(0);
ignite.cluster().active(true);
IgniteCache<Integer, TestPojo> cache = grid(0).cache("TEST_CACHE");
AtomicInteger t1Count = new AtomicInteger(5);
AtomicInteger t2Count = new AtomicInteger(5);
Thread t1 = new Thread(new Runnable() {
@Override public void run() {
try {
while (t1Count.get() > 0) {
for (int i = 0; i < 200000; i++) {
TestPojo next = new TestPojo("ORIGIN" + i, i, new java.sql.Date(new java.util.Date().getTime()));
cache.put(1, next);
TestPojo ret = cache.get(1);
assertEquals(ret, next);
}
t1Count.decrementAndGet();
}
} catch (CacheException e) {
//ignore
}
}
});
Thread t2 = new Thread(new Runnable() {
@Override public void run() {
try {
while (t2Count.get() > 0) {
for (int i = 200000; i < 400000; i++) {
TestPojo next = new TestPojo("ORIGIN" + i, i, new java.sql.Date(new java.util.Date().getTime()));
cache.put(2, next);
TestPojo ret = cache.get(2);
assertEquals(ret, next);
}
t2Count.decrementAndGet();
}
} catch (CacheException e) {
//ignore
}
}
});
TestErrorHandler handler = new TestErrorHandler();
t1.setUncaughtExceptionHandler(handler);
t2.setUncaughtExceptionHandler(handler);
t1.start();
t2.start();
t1.join();
t2.join();
assertFalse(testFailed.get());
}
/**
* @throws Exception If failed.
*/
public void readWithCoalescing() throws Exception {
Ignite ignite = startGrid(0);
ignite.cluster().active(true);
IgniteCache<Integer, TestPojo> cache = grid(0).cache("TEST_CACHE");
AtomicInteger t1Count = new AtomicInteger(5);
AtomicInteger t2Count = new AtomicInteger(5);
Thread t1 = new Thread(new Runnable() {
@Override public void run() {
try {
while (t1Count.get() > 0) {
for (int i = 0; i < 200000; i++) {
TestPojo next = new TestPojo("ORIGIN" + i, i, new java.sql.Date(new java.util.Date().getTime()));
cache.put(i, next);
TestPojo ret = cache.get(i);
assertEquals(ret, next);
}
t1Count.decrementAndGet();
}
} catch (CacheException e) {
//ignore
}
}
});
Thread t2 = new Thread(new Runnable() {
@Override public void run() {
try {
while (t2Count.get() > 0) {
for (int i = 200000; i < 400000; i++) {
TestPojo next = new TestPojo("ORIGIN" + i, i, new java.sql.Date(new java.util.Date().getTime()));
cache.put(i, next);
TestPojo ret = cache.get(i);
assertEquals(ret, next);
}
t2Count.decrementAndGet();
}
} catch (CacheException e) {
//ignore
}
}
});
TestErrorHandler handler = new TestErrorHandler();
t1.setUncaughtExceptionHandler(handler);
t2.setUncaughtExceptionHandler(handler);
t1.start();
t2.start();
t1.join();
t2.join();
assertFalse(testFailed.get());
}
/**
* @throws Exception If failed.
*/
public void writeAllWithCoalescing() throws Exception {
Ignite ignite = startGrid(0);
ignite.cluster().active(true);
IgniteCache<Integer, TestPojo> cache = grid(0).cache("TEST_CACHE");
AtomicInteger t1Count = new AtomicInteger(10);
AtomicInteger t2Count = new AtomicInteger(10);
Thread t1 = new Thread(new Runnable() {
@Override public void run() {
try {
while (t1Count.get() > 0) {
for (int i = 0; i < 5000; i++)
cache.put(i, new TestPojo("ORIGIN" + i, i, new java.sql.Date(new java.util.Date().getTime())));
t1Count.decrementAndGet();
}
} catch (CacheException e) {
//ignore
}
}
});
Thread t2 = new Thread(new Runnable() {
@Override public void run() {
try {
while (t2Count.get() > 0) {
for (int i = 0; i < 5000; i++)
cache.put(i, new TestPojo("UPDATE" + i, i, new java.sql.Date(new java.util.Date().getTime())));
try {
U.sleep(500);
}
catch (IgniteInterruptedCheckedException e) {
e.printStackTrace();
}
t2Count.decrementAndGet();
}
} catch (CacheException e) {
//ignore
}
}
});
t1.start();
t2.start();
//t1 should be completed before 10 seconds.
U.sleep(10_000);
assertEquals(0, t1Count.get());
t1.join();
t2.join();
assertEquals(0, t2Count.get());
//now wait for updates will be done on store size and check that the data set is the same
if (isHangOnWriteAll)
//max time -> 10000 updates that will be send by 1000 batches -> (10000 / 1000) * 10 = 100 seconds.
U.sleep(100_000);
else
U.sleep(10_000);
checkCacheStore(cache);
}
/**
* @throws Exception If failed.
*/
private class TestErrorHandler implements Thread.UncaughtExceptionHandler {
/** {@inheritDoc} */
@Override public void uncaughtException(Thread t, Throwable e) {
testFailed.set(true);
}
}
}