blob: 18222fd21f7528a76395a59708c463fe478eaaf4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.mvcc;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import javax.cache.Cache;
import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.TransactionConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.failure.StopNodeFailureHandler;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.lang.GridInClosure3;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.GridTestUtils.SF;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionException;
import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionSerializationException;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SQL;
import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SQL_SUM;
import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.WriteMode.DML;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
/**
*
*/
public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
/** */
protected static final ObjectCodec<Integer> INTEGER_CODEC = new IntegerCodec();
/** */
protected static final ObjectCodec<MvccTestAccount> ACCOUNT_CODEC = new AccountCodec();
/** */
static final int DFLT_PARTITION_COUNT = RendezvousAffinityFunction.DFLT_PARTITION_COUNT;
/** */
static final String CRD_ATTR = "testCrd";
/** */
static final long DFLT_TEST_TIME = SF.applyLB(30_000, 3_000);
/** */
protected static final int PAGE_SIZE = DataStorageConfiguration.DFLT_PAGE_SIZE;
/** */
protected static final int SRVS = 4;
/** */
protected boolean client;
/** */
protected boolean testSpi;
/** */
protected String nodeAttr;
/** */
protected boolean persistence;
/** */
protected CacheConfiguration ccfg;
/** */
protected CacheConfiguration[] ccfgs;
/** */
protected boolean disableScheduledVacuum;
/** */
protected static final int TX_TIMEOUT = 3000;
/**
* @return Cache mode.
*/
protected abstract CacheMode cacheMode();
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
cfg.setFailureHandler(new StopNodeFailureHandler());
if (disableScheduledVacuum)
cfg.setMvccVacuumFrequency(Integer.MAX_VALUE);
if (testSpi)
cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
cfg.setClientMode(client);
assert (ccfg == null) || (ccfgs == null);
if (ccfg != null)
cfg.setCacheConfiguration(ccfg);
if (ccfgs != null)
cfg.setCacheConfiguration(ccfgs);
if (nodeAttr != null)
cfg.setUserAttributes(F.asMap(nodeAttr, true));
DataStorageConfiguration storageCfg = new DataStorageConfiguration();
storageCfg.setWalMode(WALMode.LOG_ONLY);
storageCfg.setPageSize(PAGE_SIZE);
DataRegionConfiguration regionCfg = new DataRegionConfiguration();
regionCfg.setPersistenceEnabled(persistence);
regionCfg.setMaxSize(64L * 1024 * 1024);
storageCfg.setDefaultDataRegionConfiguration(regionCfg);
cfg.setDataStorageConfiguration(storageCfg);
cfg.setConsistentId(gridName);
cfg.setTransactionConfiguration(new TransactionConfiguration()
.setDefaultTxConcurrency(TransactionConcurrency.PESSIMISTIC)
.setDefaultTxIsolation(TransactionIsolation.REPEATABLE_READ));
return cfg;
}
/** {@inheritDoc} */
@Override protected long getTestTimeout() {
return DFLT_TEST_TIME + 60_000;
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
ccfg = null;
ccfgs = null;
MvccProcessorImpl.coordinatorAssignClosure(null);
cleanPersistenceDir();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
persistence = false;
try {
verifyOldVersionsCleaned();
verifyCoordinatorInternalState();
}
finally {
stopAllGrids();
MvccProcessorImpl.coordinatorAssignClosure(null);
cleanPersistenceDir();
}
super.afterTest();
}
/**
* @param cfgC Optional closure applied to cache configuration.
* @throws Exception If failed.
*/
final void cacheRecreate(@Nullable IgniteInClosure<CacheConfiguration> cfgC) throws Exception {
Ignite srv0 = startGrid(0);
final int PARTS = 64;
CacheConfiguration<Object, Object> ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 0, PARTS);
if (cfgC != null)
cfgC.apply(ccfg);
IgniteCache<Integer, MvccTestAccount> cache = (IgniteCache)srv0.createCache(ccfg);
for (int k = 0; k < PARTS * 2; k++) {
assertNull(cache.get(k));
int vals = k % 3 + 1;
for (int v = 0; v < vals; v++)
cache.put(k, new MvccTestAccount(v, 1));
assertEquals(vals - 1, cache.get(k).val);
}
srv0.destroyCache(cache.getName());
ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 0, PARTS);
if (cfgC != null)
cfgC.apply(ccfg);
cache = (IgniteCache)srv0.createCache(ccfg);
for (int k = 0; k < PARTS * 2; k++) {
assertNull(cache.get(k));
int vals = k % 3 + 2;
for (int v = 0; v < vals; v++)
cache.put(k, new MvccTestAccount(v + 100, 1));
assertEquals(vals - 1 + 100, cache.get(k).val);
}
srv0.destroyCache(cache.getName());
ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 0, PARTS);
IgniteCache<Long, Long> cache0 = (IgniteCache)srv0.createCache(ccfg);
for (long k = 0; k < PARTS * 2; k++) {
assertNull(cache0.get(k));
int vals = (int)(k % 3 + 2);
for (long v = 0; v < vals; v++)
cache0.put(k, v);
assertEquals((long)(vals - 1), (Object)cache0.get(k));
}
}
/**
* @param srvs Number of server nodes.
* @param clients Number of client nodes.
* @param cacheBackups Number of cache backups.
* @param cacheParts Number of cache partitions.
* @param cfgC Optional closure applied to cache configuration.
* @param withRmvs If {@code true} then in addition to puts tests also executes removes.
* @param readMode Read mode.
* @param writeMode Write mode.
* @throws Exception If failed.
*/
final void accountsTxReadAll(
final int srvs,
final int clients,
int cacheBackups,
int cacheParts,
@Nullable IgniteInClosure<CacheConfiguration> cfgC,
final boolean withRmvs,
final ReadMode readMode,
final WriteMode writeMode
) throws Exception {
accountsTxReadAll(srvs, clients, cacheBackups, cacheParts, cfgC, withRmvs, readMode, writeMode, DFLT_TEST_TIME, null);
}
/**
* @param srvs Number of server nodes.
* @param clients Number of client nodes.
* @param cacheBackups Number of cache backups.
* @param cacheParts Number of cache partitions.
* @param cfgC Optional closure applied to cache configuration.
* @param withRmvs If {@code true} then in addition to puts tests also executes removes.
* @param readMode Read mode.
* @param writeMode Write mode.
* @param testTime Test time.
* @throws Exception If failed.
*/
final void accountsTxReadAll(
final int srvs,
final int clients,
int cacheBackups,
int cacheParts,
@Nullable IgniteInClosure<CacheConfiguration> cfgC,
final boolean withRmvs,
final ReadMode readMode,
final WriteMode writeMode,
long testTime,
RestartMode restartMode
) throws Exception {
final int ACCOUNTS = 20;
final int ACCOUNT_START_VAL = 1000;
final int writers = 4;
final int readers = 4;
final IgniteInClosure<IgniteCache<Object, Object>> init = new IgniteInClosure<IgniteCache<Object, Object>>() {
@Override public void apply(IgniteCache<Object, Object> cache) {
final IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
if (writeMode == WriteMode.PUT) {
Map<Integer, MvccTestAccount> accounts = new HashMap<>();
for (int i = 0; i < ACCOUNTS; i++)
accounts.put(i, new MvccTestAccount(ACCOUNT_START_VAL, 1));
try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
cache.putAll(accounts);
tx.commit();
}
}
else if (writeMode == WriteMode.DML) {
try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
SqlFieldsQuery qry = new SqlFieldsQuery("insert into MvccTestAccount(_key, val, updateCnt) values " +
"(?," + ACCOUNT_START_VAL + ",1)");
for (int i = 0; i < ACCOUNTS; i++) {
try (FieldsQueryCursor<List<?>> cur = cache.query(qry.setArgs(i))) {
assertEquals(1L, cur.iterator().next().get(0));
}
tx.commit();
}
}
}
else
assert false : "Unknown write mode";
}
};
final RemovedAccountsTracker rmvdTracker = new RemovedAccountsTracker(ACCOUNTS);
GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer =
new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
@Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
int cnt = 0;
while (!stop.get()) {
TestCache<Integer, MvccTestAccount> cache = randomCache(caches, rnd);
try {
IgniteTransactions txs = cache.cache.unwrap(Ignite.class).transactions();
cnt++;
int i1 = rnd.nextInt(ACCOUNTS), i2 = rnd.nextInt(ACCOUNTS);
while (i2 == i1)
i2 = rnd.nextInt(ACCOUNTS);
Integer id1 = Math.min(i1, i2);
Integer id2 = Math.max(i1, i2);
Set<Integer> keys = new HashSet<>();
keys.add(id1);
keys.add(id2);
Integer cntr1 = null;
Integer cntr2 = null;
Integer rmvd = null;
Integer inserted = null;
MvccTestAccount a1;
MvccTestAccount a2;
try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
tx.timeout(TX_TIMEOUT);
Map<Integer, MvccTestAccount> accounts = null;
if (writeMode == WriteMode.PUT)
accounts = cache.cache.getAll(keys);
else if (writeMode == WriteMode.DML)
accounts = getAllSql(cache);
else
assert false : "Unknown write mode";
a1 = accounts.get(id1);
a2 = accounts.get(id2);
if (!withRmvs) {
assertNotNull(a1);
assertNotNull(a2);
cntr1 = a1.updateCnt + 1;
cntr2 = a2.updateCnt + 1;
if (writeMode == WriteMode.PUT) {
cache.cache.put(id1, new MvccTestAccount(a1.val + 1, cntr1));
cache.cache.put(id2, new MvccTestAccount(a2.val - 1, cntr2));
}
else if (writeMode == WriteMode.DML) {
updateSql(cache, id1, a1.val + 1, cntr1);
updateSql(cache, id2, a2.val - 1, cntr2);
}
else
assert false : "Unknown write mode";
}
else {
if (a1 != null || a2 != null) {
if (a1 != null && a2 != null) {
if (rnd.nextInt(10) == 0) {
if (rmvdTracker.size() < ACCOUNTS / 2) {
rmvd = rnd.nextBoolean() ? id1 : id2;
assertTrue(rmvdTracker.markRemoved(rmvd));
}
}
if (rmvd != null) {
if (writeMode == WriteMode.PUT) {
if (rmvd.equals(id1)) {
cache.cache.remove(id1);
cache.cache.put(id2, new MvccTestAccount(a1.val + a2.val, 1));
}
else {
cache.cache.put(id1, new MvccTestAccount(a1.val + a2.val, 1));
cache.cache.remove(id2);
}
}
else if (writeMode == WriteMode.DML) {
if (rmvd.equals(id1)) {
removeSql(cache, id1);
updateSql(cache, id2,a1.val + a2.val, 1);
}
else {
updateSql(cache, id1,a1.val + a2.val, 1);
removeSql(cache, id2);
}
}
else
assert false : "Unknown write mode";
}
else {
if (writeMode == WriteMode.PUT) {
cache.cache.put(id1, new MvccTestAccount(a1.val + 1, 1));
cache.cache.put(id2, new MvccTestAccount(a2.val - 1, 1));
}
else if (writeMode == WriteMode.DML) {
updateSql(cache, id1, a1.val + 1, 1);
updateSql(cache, id2, a2.val - 1, 1);
}
else
assert false : "Unknown write mode";
}
}
else {
if (a1 == null) {
inserted = id1;
if (writeMode == WriteMode.PUT) {
cache.cache.put(id1, new MvccTestAccount(100, 1));
cache.cache.put(id2, new MvccTestAccount(a2.val - 100, 1));
}
else if (writeMode == WriteMode.DML) {
insertSql(cache, id1, 100, 1);
updateSql(cache, id2, a2.val - 100, 1);
}
else
assert false : "Unknown write mode";
}
else {
inserted = id2;
if (writeMode == WriteMode.PUT) {
cache.cache.put(id1, new MvccTestAccount(a1.val - 100, 1));
cache.cache.put(id2, new MvccTestAccount(100, 1));
}
else if (writeMode == WriteMode.DML) {
updateSql(cache, id1, a1.val - 100, 1);
insertSql(cache, id2, 100, 1);
}
else
assert false : "Unknown write mode";
}
}
}
}
tx.commit();
// In case of tx success mark inserted.
if (inserted != null) {
assert withRmvs;
assertTrue(rmvdTracker.unmarkRemoved(inserted));
}
}
catch (Throwable e) {
if (rmvd != null) {
assert withRmvs;
// If tx fails, unmark removed.
assertTrue(rmvdTracker.unmarkRemoved(rmvd));
}
throw e;
}
if (!withRmvs) {
Map<Integer, MvccTestAccount> accounts = null;
if (writeMode == WriteMode.PUT)
accounts = cache.cache.getAll(keys);
else if (writeMode == WriteMode.DML)
accounts = getAllSql(cache);
else
assert false : "Unknown write mode";
a1 = accounts.get(id1);
a2 = accounts.get(id2);
assertNotNull(a1);
assertNotNull(a2);
assertTrue(a1.updateCnt >= cntr1);
assertTrue(a2.updateCnt >= cntr2);
}
}
catch (Exception e) {
handleTxException(e);
}
finally {
cache.readUnlock();
}
}
info("Writer finished, updates: " + cnt);
}
};
GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader =
new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
@Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
Set<Integer> keys = new LinkedHashSet<>();
Map<Integer, Integer> lastUpdateCntrs = new HashMap<>();
SqlFieldsQuery sumQry = new SqlFieldsQuery("select sum(val) from MvccTestAccount");
while (!stop.get()) {
while (keys.size() < ACCOUNTS)
keys.add(rnd.nextInt(ACCOUNTS));
TestCache<Integer, MvccTestAccount> cache = randomCache(caches, rnd);
Map<Integer, MvccTestAccount> accounts = null;
try {
switch (readMode) {
case GET: {
accounts = cache.cache.getAll(keys);
break;
}
case SCAN: {
accounts = new HashMap<>();
Iterator<Cache.Entry<Integer, MvccTestAccount>> it = cache.cache.iterator();
try {
for (; it.hasNext(); ) {
IgniteCache.Entry<Integer, MvccTestAccount> e = it.next();
MvccTestAccount old = accounts.put(e.getKey(), e.getValue());
assertNull("new=" + e + ", old=" + old, old);
}
} finally {
U.closeQuiet((AutoCloseable) it);
}
break;
}
case SQL: {
accounts = new HashMap<>();
if (rnd.nextBoolean()) {
SqlQuery<Integer, MvccTestAccount> qry =
new SqlQuery<>(MvccTestAccount.class, "_key >= 0");
for (IgniteCache.Entry<Integer, MvccTestAccount> e : cache.cache.query(qry).getAll()) {
MvccTestAccount old = accounts.put(e.getKey(), e.getValue());
assertNull(old);
}
}
else {
SqlFieldsQuery qry = new SqlFieldsQuery("select _key, val from MvccTestAccount");
for (List<?> row : cache.cache.query(qry).getAll()) {
Integer id = (Integer)row.get(0);
Integer val = (Integer)row.get(1);
MvccTestAccount old = accounts.put(id, new MvccTestAccount(val, 1));
assertNull(old);
}
}
break;
}
case SQL_SUM: {
Long sum;
if (rnd.nextBoolean()) {
List<List<?>> res = cache.cache.query(sumQry).getAll();
assertEquals(1, res.size());
sum = (Long)res.get(0).get(0);
}
else {
Map res = readAllByMode(cache.cache, keys, readMode, ACCOUNT_CODEC);
sum = (Long)((Map.Entry)res.entrySet().iterator().next()).getValue();
}
assertEquals(ACCOUNT_START_VAL * ACCOUNTS, sum.intValue());
break;
}
default: {
fail();
return;
}
}
}
finally {
cache.readUnlock();
}
if (accounts != null) {
if (!withRmvs)
assertEquals(ACCOUNTS, accounts.size());
int sum = 0;
for (int i = 0; i < ACCOUNTS; i++) {
MvccTestAccount account = accounts.get(i);
if (account != null) {
sum += account.val;
Integer cntr = lastUpdateCntrs.get(i);
if (cntr != null)
assertTrue(cntr <= account.updateCnt);
lastUpdateCntrs.put(i, cntr);
}
else
assertTrue(withRmvs);
}
assertEquals(ACCOUNTS * ACCOUNT_START_VAL, sum);
}
}
if (idx == 0) {
TestCache<Integer, MvccTestAccount> cache = randomCache(caches, rnd);
Map<Integer, MvccTestAccount> accounts;
ReadMode readMode0 = readMode == SQL_SUM ? SQL : readMode;
try {
accounts = readAllByMode(cache.cache, keys, readMode0, ACCOUNT_CODEC);;
}
finally {
cache.readUnlock();
}
int sum = 0;
for (int i = 0; i < ACCOUNTS; i++) {
MvccTestAccount account = accounts.get(i);
assertTrue(account != null || withRmvs);
info("Account [id=" + i + ", val=" + (account != null ? account.val : null) + ']');
if (account != null)
sum += account.val;
}
info("Sum: " + sum);
}
}
};
readWriteTest(
restartMode,
srvs,
clients,
cacheBackups,
cacheParts,
writers,
readers,
testTime,
cfgC,
init,
writer,
reader);
}
/**
* Returns all accounts from cache by means of SQL.
*
* @param cache Cache.
* @return All accounts
*/
protected static Map<Integer, MvccTestAccount> getAllSql(TestCache<Integer, MvccTestAccount> cache) {
Map<Integer, MvccTestAccount> accounts = new HashMap<>();
SqlFieldsQuery qry = new SqlFieldsQuery("select _key, val, updateCnt from MvccTestAccount");
for (List<?> row : cache.cache.query(qry).getAll()) {
Integer id = (Integer)row.get(0);
Integer val = (Integer)row.get(1);
Integer updateCnt = (Integer)row.get(2);
MvccTestAccount old = accounts.put(id, new MvccTestAccount(val, updateCnt));
assertNull(old);
}
return accounts;
}
/**
* Updates account by means of SQL API.
*
* @param cache Cache.
* @param key Key.
* @param val Value.
* @param updateCnt Update counter.
*/
private static void updateSql(TestCache<Integer, MvccTestAccount> cache, Integer key, Integer val, Integer updateCnt) {
SqlFieldsQuery qry = new SqlFieldsQuery("update MvccTestAccount set val=" + val + ", updateCnt=" +
updateCnt + " where _key=" + key);
cache.cache.query(qry).getAll();
}
/**
* Removes account by means of SQL API.
*
* @param cache Cache.
* @param key Key.
*/
protected static void removeSql(TestCache<Integer, MvccTestAccount> cache, Integer key) {
SqlFieldsQuery qry = new SqlFieldsQuery("delete from MvccTestAccount where _key=" + key);
cache.cache.query(qry).getAll();
}
/**
* Merge account by means of SQL API.
*
* @param cache Cache.
* @param key Key.
* @param val Value.
* @param updateCnt Update counter.
*/
protected static void mergeSql(TestCache<Integer, MvccTestAccount> cache, Integer key, Integer val, Integer updateCnt) {
SqlFieldsQuery qry = new SqlFieldsQuery("merge into MvccTestAccount(_key, val, updateCnt) values " +
" (" + key + ", " + val + ", " + updateCnt + ")");
cache.cache.query(qry).getAll();
}
/**
* Inserts account by means of SQL API.
*
* @param cache Cache.
* @param key Key.
* @param val Value.
* @param updateCnt Update counter.
*/
private static void insertSql(TestCache<Integer, MvccTestAccount> cache, int key, Integer val, Integer updateCnt) {
SqlFieldsQuery qry = new SqlFieldsQuery("insert into MvccTestAccount(_key, val, updateCnt) values " +
" (" + key + ", " + val + ", " + updateCnt + ")");
cache.cache.query(qry).getAll();
}
/**
* @param restartMode Restart mode.
* @param srvs Number of server nodes.
* @param clients Number of client nodes.
* @param cacheBackups Number of cache backups.
* @param cacheParts Number of cache partitions.
* @param readMode Read mode.
* @throws Exception If failed.
*/
@SuppressWarnings("unchecked")
protected void putAllGetAll(
RestartMode restartMode,
final int srvs,
final int clients,
int cacheBackups,
int cacheParts,
@Nullable IgniteInClosure<CacheConfiguration> cfgC,
ReadMode readMode,
WriteMode writeMode
) throws Exception {
final int RANGE = 20;
final int writers = 4;
final int readers = 4;
GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer =
new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
@Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
int min = idx * RANGE;
int max = min + RANGE;
info("Thread range [min=" + min + ", max=" + max + ']');
// Sorted map for put to avoid deadlocks.
Map<Integer, Integer> map = new TreeMap<>();
// Unordered key sequence.
Set<Integer> keys = new LinkedHashSet<>();
int v = idx * 1_000_000;
boolean first = true;
while (!stop.get()) {
while (keys.size() < RANGE) {
int key = rnd.nextInt(min, max);
if (keys.add(key))
map.put(key, v);
}
TestCache<Integer, Integer> cache = randomCache(caches, rnd);
try {
IgniteTransactions txs = cache.cache.unwrap(Ignite.class).transactions();
try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
if (!first && rnd.nextBoolean()) {
Map<Integer, Integer> res = readAllByMode(cache.cache, keys, readMode, INTEGER_CODEC);
for (Integer k : keys)
assertEquals("res=" + res, v - 1, (Object)res.get(k));
}
writeAllByMode(cache.cache, map, writeMode, INTEGER_CODEC);
tx.commit();
v++;
first = false;
}
if (rnd.nextBoolean()) {
Map<Integer, Integer> res = readAllByMode(cache.cache, keys, readMode, INTEGER_CODEC);
for (Integer k : keys)
assertEquals("key=" + k, v - 1, (Object)res.get(k));
}
}
catch (Exception e) {
handleTxException(e);
}
finally {
cache.readUnlock();
keys.clear();
map.clear();
}
}
info("Writer done, updates: " + v);
}
};
GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader =
new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
@Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
Set<Integer> keys = new LinkedHashSet<>();
Map<Integer, Integer> readVals = new HashMap<>();
while (!stop.get()) {
int range = rnd.nextInt(0, writers);
int min = range * RANGE;
int max = min + RANGE;
keys.clear();
while (keys.size() < RANGE)
keys.add(rnd.nextInt(min, max));
TestCache<Integer, Integer> cache = randomCache(caches, rnd);
Map<Integer, Integer> map;
try {
map = readAllByMode(cache.cache, keys, readMode, INTEGER_CODEC);
}
catch (Exception e) {
handleTxException(e);
continue;
}
finally {
cache.readUnlock();
}
assertTrue("Invalid map size: " + map.size() + ", map=" + map, map.isEmpty() || map.size() == RANGE);
Integer val0 = null;
for (Map.Entry<Integer, Integer> e: map.entrySet()) {
Integer val = e.getValue();
assertNotNull(val);
if (val0 == null) {
Integer readVal = readVals.get(range);
if (readVal != null)
assertTrue("readVal=" + readVal + ", val=" + val + ", map=" + map,readVal <= val);
readVals.put(range, val);
val0 = val;
}
else {
if (!F.eq(val0, val)) {
assertEquals("Unexpected value [range=" + range + ", key=" + e.getKey() + ']' +
", map=" + map,
val0,
val);
}
}
}
}
}
};
readWriteTest(
restartMode,
srvs,
clients,
cacheBackups,
cacheParts,
writers,
readers,
DFLT_TEST_TIME,
cfgC,
null,
writer,
reader);
for (Ignite node : G.allGrids())
checkActiveQueriesCleanup(node);
}
/**
* @param N Number of object to update in single transaction.
* @param srvs Number of server nodes.
* @param clients Number of client nodes.
* @param cacheBackups Number of cache backups.
* @param cacheParts Number of cache partitions.
* @param time Test time.
* @param readMode Read mode.
* @throws Exception If failed.
*/
@SuppressWarnings("unchecked")
protected void updateNObjectsTest(
final int N,
final int srvs,
final int clients,
int cacheBackups,
int cacheParts,
long time,
@Nullable IgniteInClosure<CacheConfiguration> cfgC,
ReadMode readMode,
WriteMode writeMode,
RestartMode restartMode
)
throws Exception
{
final int TOTAL = 20;
assert N <= TOTAL;
info("updateNObjectsTest [n=" + N + ", total=" + TOTAL + ']');
final int writers = 4;
final int readers = 4;
final IgniteInClosure<IgniteCache<Object, Object>> init = new IgniteInClosure<IgniteCache<Object, Object>>() {
@Override public void apply(IgniteCache<Object, Object> cache) {
final IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
Map<Integer, Integer> vals = new LinkedHashMap<>();
for (int i = 0; i < TOTAL; i++)
vals.put(i, N);
try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
writeAllByMode(cache, vals, writeMode, INTEGER_CODEC);
tx.commit();
}
}
};
GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer =
new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
@Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
int cnt = 0;
while (!stop.get()) {
TestCache<Integer, Integer> cache = randomCache(caches, rnd);
IgniteTransactions txs = cache.cache.unwrap(Ignite.class).transactions();
TreeSet<Integer> keys = new TreeSet<>();
while (keys.size() < N)
keys.add(rnd.nextInt(TOTAL));
try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
tx.timeout(TX_TIMEOUT);
Map<Integer, Integer> curVals = readAllByMode(cache.cache, keys, readMode, INTEGER_CODEC);
assertEquals(N, curVals.size());
Map<Integer, Integer> newVals = new TreeMap<>();
for (Map.Entry<Integer, Integer> e : curVals.entrySet())
newVals.put(e.getKey(), e.getValue() + 1);
writeAllByMode(cache.cache, newVals, writeMode, INTEGER_CODEC);
tx.commit();
}
catch (Exception e) {
handleTxException(e);
}
finally {
cache.readUnlock();
}
cnt++;
}
info("Writer finished, updates: " + cnt);
}
};
GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader =
new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
@Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
Set<Integer> keys = new LinkedHashSet<>();
while (!stop.get()) {
while (keys.size() < TOTAL)
keys.add(rnd.nextInt(TOTAL));
TestCache<Integer, Integer> cache = randomCache(caches, rnd);
Map<Integer, Integer> vals = null;
try {
vals = readAllByMode(cache.cache, keys, readMode, INTEGER_CODEC);
}
catch (Exception e) {
handleTxException(e);
}
finally {
cache.readUnlock();
}
assertEquals("vals=" + vals, TOTAL, vals.size());
int sum = 0;
for (int i = 0; i < TOTAL; i++) {
Integer val = vals.get(i);
assertNotNull(val);
sum += val;
}
assertEquals(0, sum % N);
}
if (idx == 0) {
TestCache<Integer, Integer> cache = randomCache(caches, rnd);
Map<Integer, Integer> vals;
try {
vals = readAllByMode(cache.cache, keys, readMode, INTEGER_CODEC);
}
finally {
cache.readUnlock();
}
int sum = 0;
for (int i = 0; i < TOTAL; i++) {
Integer val = vals.get(i);
info("Value [id=" + i + ", val=" + val + ']');
sum += val;
}
info("Sum [sum=" + sum + ", mod=" + sum % N + ']');
}
}
};
readWriteTest(
restartMode,
srvs,
clients,
cacheBackups,
cacheParts,
writers,
readers,
time,
cfgC,
init,
writer,
reader);
}
/**
* @param restartMode Restart mode.
* @param srvs Number of server nodes.
* @param clients Number of client nodes.
* @param cacheBackups Number of cache backups.
* @param cacheParts Number of cache partitions.
* @param time Test time.
* @param cfgC Optional closure applied to cache configuration.
* @param writers Number of writers.
* @param readers Number of readers.
* @param init Optional init closure.
* @param writer Writers threads closure.
* @param reader Readers threads closure.
* @throws Exception If failed.
*/
final void readWriteTest(
final RestartMode restartMode,
final int srvs,
final int clients,
int cacheBackups,
int cacheParts,
final int writers,
final int readers,
final long time,
@Nullable IgniteInClosure<CacheConfiguration> cfgC,
IgniteInClosure<IgniteCache<Object, Object>> init,
final GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer,
final GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader) throws Exception {
if (restartMode == RestartMode.RESTART_CRD)
MvccProcessorImpl.coordinatorAssignClosure(new CoordinatorAssignClosure());
Ignite srv0 = startGridsMultiThreaded(srvs);
if (clients > 0) {
client = true;
startGridsMultiThreaded(srvs, clients);
client = false;
}
CacheConfiguration<Object, Object> ccfg = cacheConfiguration(cacheMode(),
FULL_SYNC,
cacheBackups,
cacheParts);
if (restartMode == RestartMode.RESTART_CRD)
ccfg.setNodeFilter(new CoordinatorNodeFilter());
if (cfgC != null)
cfgC.apply(ccfg);
IgniteCache<Object, Object> cache = srv0.createCache(ccfg);
int crdIdx = srvs + clients;
if (restartMode == RestartMode.RESTART_CRD) {
nodeAttr = CRD_ATTR;
startGrid(crdIdx);
}
if (init != null)
init.apply(cache);
final List<TestCache> caches = new ArrayList<>(srvs + clients);
for (int i = 0; i < srvs + clients; i++) {
Ignite node = grid(i);
caches.add(new TestCache(node.cache(cache.getName())));
}
final long stopTime = U.currentTimeMillis() + time;
final AtomicBoolean stop = new AtomicBoolean();
try {
final AtomicInteger writerIdx = new AtomicInteger();
IgniteInternalFuture<?> writeFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
try {
int idx = writerIdx.getAndIncrement();
writer.apply(idx, caches, stop);
}
catch (Throwable e) {
if (restartMode != null && X.hasCause(e, ClusterTopologyException.class)) {
log.info("Writer error: " + e);
return null;
}
error("Unexpected error: " + e, e);
stop.set(true);
fail("Unexpected error: " + e);
}
return null;
}
}, writers, "writer");
final AtomicInteger readerIdx = new AtomicInteger();
IgniteInternalFuture<?> readFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
try {
int idx = readerIdx.getAndIncrement();
reader.apply(idx, caches, stop);
}
catch (Throwable e) {
if (restartMode != null && X.hasCause(e, ClusterTopologyException.class)) {
log.info("Writer error: " + e);
return null;
}
error("Unexpected error: " + e, e);
stop.set(true);
fail("Unexpected error: " + e);
}
return null;
}
}, readers, "reader");
GridTestUtils.runAsync(() -> {
while (System.currentTimeMillis() < stopTime)
doSleep(1000);
stop.set(true);
});
while (System.currentTimeMillis() < stopTime && !stop.get()) {
Thread.sleep(1000);
if (System.currentTimeMillis() >= stopTime || stop.get())
break;
if (restartMode != null) {
switch (restartMode) {
case RESTART_CRD: {
log.info("Start new coordinator: " + (crdIdx + 1));
startGrid(crdIdx + 1);
log.info("Stop current coordinator: " + crdIdx);
stopGrid(crdIdx);
crdIdx++;
awaitPartitionMapExchange();
break;
}
case RESTART_RND_SRV: {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
int idx = rnd.nextInt(srvs);
TestCache cache0 = caches.get(idx);
cache0.stopLock.writeLock().lock();
log.info("Stop node: " + idx);
stopGrid(idx);
log.info("Start new node: " + idx);
Ignite srv = startGrid(idx);
cache0 = new TestCache(srv.cache(DEFAULT_CACHE_NAME));
synchronized (caches) {
caches.set(idx, cache0);
}
awaitPartitionMapExchange();
break;
}
default:
fail();
}
}
}
Exception ex = null;
try {
writeFut.get();
}
catch (IgniteCheckedException e) {
ex = e;
}
try {
readFut.get();
}
catch (IgniteCheckedException e) {
if (ex != null)
ex.addSuppressed(e);
else
ex = e;
}
if (ex != null)
throw ex;
}
finally {
stop.set(true);
}
}
/**
* @param cacheMode Cache mode.
* @param syncMode Write synchronization mode.
* @param backups Number of backups.
* @param parts Number of partitions.
* @return Cache configuration.
*/
final CacheConfiguration<Object, Object> cacheConfiguration(
CacheMode cacheMode,
CacheWriteSynchronizationMode syncMode,
int backups,
int parts) {
CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
ccfg.setCacheMode(cacheMode);
ccfg.setAtomicityMode(TRANSACTIONAL_SNAPSHOT);
ccfg.setWriteSynchronizationMode(syncMode);
ccfg.setAffinity(new RendezvousAffinityFunction(false, parts));
if (cacheMode == PARTITIONED)
ccfg.setBackups(backups);
return ccfg;
}
/**
* Handles transaction exception.
* @param e Exception.
*/
protected void handleTxException(Exception e) {
if (log.isDebugEnabled())
log.debug("Exception during tx execution: " + X.getFullStackTrace(e));
if (X.hasCause(e, IgniteFutureCancelledCheckedException.class))
return;
if (X.hasCause(e, ClusterTopologyException.class))
return;
if (X.hasCause(e, ClusterTopologyCheckedException.class))
return;
if (X.hasCause(e, IgniteTxRollbackCheckedException.class))
return;
if (X.hasCause(e, TransactionException.class))
return;
if (X.hasCause(e, IgniteTxTimeoutCheckedException.class))
return;
if (X.hasCause(e, TransactionSerializationException.class))
return;
if (X.hasCause(e, CacheException.class)) {
CacheException cacheEx = X.cause(e, CacheException.class);
if (cacheEx != null && cacheEx.getMessage() != null) {
if (cacheEx.getMessage().contains("Data node has left the grid during query execution"))
return;
}
if (cacheEx != null && cacheEx.getMessage() != null) {
if (cacheEx.getMessage().contains("Query was interrupted."))
return;
}
if (cacheEx != null && cacheEx.getMessage() != null) {
if (cacheEx.getMessage().contains("Failed to fetch data from node"))
return;
}
if (cacheEx != null && cacheEx.getMessage() != null) {
if (cacheEx.getMessage().contains("Failed to send message"))
return;
}
}
if (X.hasCause(e, IgniteSQLException.class)) {
IgniteSQLException sqlEx = X.cause(e, IgniteSQLException.class);
if (sqlEx != null && sqlEx.getMessage() != null) {
if (sqlEx.getMessage().contains("Transaction is already completed."))
return;
}
}
fail("Unexpected tx exception. " + X.getFullStackTrace(e));
}
/**
* @throws Exception If failed.
*/
final void verifyCoordinatorInternalState() throws Exception {
for (Ignite node : G.allGrids()) {
final MvccProcessorImpl crd = mvccProcessor(node);
if (!crd.mvccEnabled())
continue;
crd.stopVacuumWorkers(); // to prevent new futures creation.
Map activeTxs = GridTestUtils.getFieldValue(crd, "activeTxs");
Map<?, Map> cntrFuts = GridTestUtils.getFieldValue(crd, "snapLsnrs");
Map ackFuts = GridTestUtils.getFieldValue(crd, "ackFuts");
Map activeTrackers = GridTestUtils.getFieldValue(crd, "activeTrackers");
GridAbsPredicate cond = () -> {
log.info("activeTxs=" + activeTxs + ", cntrFuts=" + cntrFuts + ", ackFuts=" + ackFuts +
", activeTrackers=" + activeTrackers);
boolean empty = true;
for (Map map : cntrFuts.values())
if (!(empty = map.isEmpty()))
break;
return activeTxs.isEmpty() && empty && ackFuts.isEmpty() && activeTrackers.isEmpty();
};
GridTestUtils.waitForCondition(cond, TX_TIMEOUT);
assertTrue("activeTxs: " + activeTxs, activeTxs.isEmpty());
boolean empty = true;
for (Map map : cntrFuts.values())
if (!(empty = map.isEmpty())) break;
assertTrue("cntrFuts: " + cntrFuts, empty);
assertTrue("ackFuts: " + ackFuts, ackFuts.isEmpty());
assertTrue("activeTrackers: " + activeTrackers, activeTrackers.isEmpty());
checkActiveQueriesCleanup(node);
}
}
/**
* Checks if less than 2 versions remain after the vacuum cleanup.
*
* @throws Exception If failed.
*/
protected void verifyOldVersionsCleaned() throws Exception {
boolean retry;
try {
runVacuumSync();
// Check versions.
retry = !checkOldVersions(false);
}
catch (Exception e) {
U.warn(log(), "Failed to perform vacuum, will retry.", e);
retry = true;
}
if (retry) { // Retry on a stable topology with a newer snapshot.
awaitPartitionMapExchange();
waitMvccQueriesDone();
runVacuumSync();
checkOldVersions(true);
}
}
/**
* Waits until all active queries are terminated on the Mvcc coordinator.
*
* @throws Exception If failed.
*/
private void waitMvccQueriesDone() throws Exception {
for (Ignite node : G.allGrids()) {
checkActiveQueriesCleanup(node);
}
}
/**
* Checks if outdated versions were cleaned after the vacuum process.
*
* @param failIfNotCleaned Fail test if not cleaned.
* @return {@code False} if not cleaned.
* @throws IgniteCheckedException If failed.
*/
private boolean checkOldVersions(boolean failIfNotCleaned) throws IgniteCheckedException {
for (Ignite node : G.allGrids()) {
for (IgniteCacheProxy cache : ((IgniteKernal)node).caches()) {
GridCacheContext cctx = cache.context();
if (!cctx.userCache() || !cctx.group().mvccEnabled() || F.isEmpty(cctx.group().caches()) || cctx.shared().closed(cctx))
continue;
try (GridCloseableIterator it = (GridCloseableIterator)cache.withKeepBinary().iterator()) {
while (it.hasNext()) {
IgniteBiTuple entry = (IgniteBiTuple)it.next();
KeyCacheObject key = cctx.toCacheKeyObject(entry.getKey());
List<IgniteBiTuple<Object, MvccVersion>> vers = cctx.offheap().mvccAllVersions(cctx, key)
.stream().filter(t -> t.get1() != null).collect(Collectors.toList());
if (vers.size() > 1) {
if (failIfNotCleaned)
fail("[key=" + key.value(null, false) + "; vers=" + vers + ']');
else
return false;
}
}
}
}
}
return true;
}
/**
* Runs vacuum on all nodes and waits for its completion.
*
* @throws IgniteCheckedException If failed.
*/
private void runVacuumSync() throws IgniteCheckedException {
GridCompoundIdentityFuture<VacuumMetrics> fut = new GridCompoundIdentityFuture<>();
// Run vacuum manually.
for (Ignite node : G.allGrids()) {
if (!node.configuration().isClientMode()) {
MvccProcessorImpl crd = mvccProcessor(node);
if (!crd.mvccEnabled() || GridTestUtils.getFieldValue(crd, "vacuumWorkers") == null)
continue;
assert GridTestUtils.getFieldValue(crd, "txLog") != null;
fut.add(crd.runVacuum());
}
}
fut.markInitialized();
// Wait vacuum finished.
fut.get(getTestTimeout());
}
/**
* @param node Ignite node.
* @return Mvcc processor.
*/
protected MvccProcessorImpl mvccProcessor(Ignite node) {
GridKernalContext ctx = ((IgniteEx)node).context();
MvccProcessor crd = ctx.coordinators();
assertNotNull(crd);
return (MvccProcessorImpl)crd;
}
/**
* @param node Node.
* @throws Exception If failed.
*/
protected final void checkActiveQueriesCleanup(Ignite node) throws Exception {
final MvccProcessorImpl prc = mvccProcessor(node);
MvccCoordinator crd = prc.currentCoordinator();
if (!crd.local())
return;
assertTrue("Coordinator is not initialized: " + prc, GridTestUtils.waitForCondition(crd::initialized, 8_000));
assertTrue("Active queries are not cleared: " + node.name(), GridTestUtils.waitForCondition(
new GridAbsPredicate() {
@Override public boolean apply() {
Object activeQueries = GridTestUtils.getFieldValue(prc, "activeQueries");
synchronized (activeQueries) {
Long minQry = GridTestUtils.getFieldValue(activeQueries, "minQry");
if (minQry != null)
log.info("Min query: " + minQry);
Map<Object, Map> queriesMap = GridTestUtils.getFieldValue(activeQueries, "activeQueries");
boolean empty = true;
for (Map.Entry<Object, Map> e : queriesMap.entrySet()) {
if (!e.getValue().isEmpty()) {
empty = false;
log.info("Active queries: " + e);
}
}
return empty && minQry == null;
}
}
}, 8_000)
);
assertTrue("Previous coordinator queries are not empty: " + node.name(), GridTestUtils.waitForCondition(
new GridAbsPredicate() {
@Override public boolean apply() {
PreviousQueries prevQueries = GridTestUtils.getFieldValue(prc, "prevQueries");
synchronized (prevQueries) {
Map queries = GridTestUtils.getFieldValue(prevQueries, "active");
Boolean prevDone = GridTestUtils.getFieldValue(prevQueries, "done");
if (!queries.isEmpty() || !prevDone)
log.info("Previous coordinator state [prevDone=" + prevDone + ", queries=" + queries + ']');
return queries.isEmpty();
}
}
}, 8_000)
);
}
/**
* @return Cache configurations.
*/
protected List<CacheConfiguration<Object, Object>> cacheConfigurations() {
List<CacheConfiguration<Object, Object>> ccfgs = new ArrayList<>();
ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, RendezvousAffinityFunction.DFLT_PARTITION_COUNT));
ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, RendezvousAffinityFunction.DFLT_PARTITION_COUNT));
ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 2, RendezvousAffinityFunction.DFLT_PARTITION_COUNT));
ccfgs.add(cacheConfiguration(REPLICATED, FULL_SYNC, 0, RendezvousAffinityFunction.DFLT_PARTITION_COUNT));
return ccfgs;
}
/**
* Reads value from cache for the given key using given read mode.
*
* @param cache Cache.
* @param key Key.
* @param readMode Read mode.
* @param codec Sql object codec.
* @return Value.
*/
@SuppressWarnings("unchecked")
protected Object readByMode(IgniteCache cache, final Object key, ReadMode readMode, ObjectCodec codec) {
assert cache != null && key != null && readMode != null && readMode != SQL_SUM;
assert readMode != SQL || codec != null;
boolean emulateLongQry = ThreadLocalRandom.current().nextBoolean();
switch (readMode) {
case GET:
return cache.get(key);
case SCAN:
ScanQuery scanQry = new ScanQuery(new IgniteBiPredicate() {
@Override public boolean apply(Object k, Object v) {
if (emulateLongQry)
doSleep(ThreadLocalRandom.current().nextInt(50));
return k.equals(key);
}
});
List res = cache.query(scanQry).getAll();
assertTrue(res.size() <= 1);
return res.isEmpty() ? null : ((IgniteBiTuple)res.get(0)).getValue();
case SQL:
String qry = "SELECT * FROM " + codec.tableName() + " WHERE _key=" + key;
SqlFieldsQuery sqlFieldsQry = new SqlFieldsQuery(qry);
if (emulateLongQry)
sqlFieldsQry.setLazy(true).setPageSize(1);
List<List> rows;
if (emulateLongQry) {
FieldsQueryCursor<List> cur = cache.query(sqlFieldsQry);
rows = new ArrayList<>();
for (List row : cur) {
rows.add(row);
doSleep(ThreadLocalRandom.current().nextInt(50));
}
}
else
rows = cache.query(sqlFieldsQry).getAll();
assertTrue(rows.size() <= 1);
return rows.isEmpty() ? null : codec.decode(rows.get(0));
default:
throw new AssertionError("Unsupported read mode: " + readMode);
}
}
/**
* Writes value into cache using given write mode.
*
* @param cache Cache.
* @param key Key.
* @param val Value.
* @param writeMode Write mode.
* @param codec Sql object codec.
*/
@SuppressWarnings("unchecked")
protected void writeByMode(IgniteCache cache, final Object key, Object val, WriteMode writeMode, ObjectCodec codec) {
assert writeMode != DML || codec != null;
assert cache != null && key != null && writeMode != null && val != null;
switch (writeMode) {
case PUT:
cache.put(key, val);
return;
case DML:
String qry = "MERGE INTO " + codec.tableName() + " (" + codec.columnsNames() + ") VALUES " +
'(' + key + ", " + codec.encode(val) + ')';
List<List> rows = cache.query(new SqlFieldsQuery(qry)).getAll();
assertTrue(rows.size() <= 1);
return;
default:
throw new AssertionError("Unsupported write mode: " + writeMode);
}
}
/**
* Reads value from cache for the given key using given read mode.
*
* @param cache Cache.
* @param keys Key.
* @param readMode Read mode.
* @param codec Value codec.
* @return Value.
*/
@SuppressWarnings("unchecked")
protected Map readAllByMode(IgniteCache cache, Set keys, ReadMode readMode, ObjectCodec codec) {
assert cache != null && keys != null && readMode != null;
assert readMode != SQL || codec != null;
boolean emulateLongQry = ThreadLocalRandom.current().nextBoolean();
switch (readMode) {
case GET:
return cache.getAll(keys);
case SCAN:
ScanQuery scanQry = new ScanQuery(new IgniteBiPredicate() {
@Override public boolean apply(Object k, Object v) {
if (emulateLongQry)
doSleep(ThreadLocalRandom.current().nextInt(50));
return keys.contains(k);
}
});
Map res;
try (QueryCursor qry = cache.query(scanQry)) {
res = (Map)qry.getAll()
.stream()
.collect(Collectors.toMap(v -> ((IgniteBiTuple)v).getKey(), v -> ((IgniteBiTuple)v).getValue()));
assertTrue("res.size()=" + res.size() + ", keys.size()=" + keys.size(), res.size() <= keys.size());
}
return res;
case SQL:
StringBuilder b = new StringBuilder("SELECT " + codec.columnsNames() + " FROM " + codec.tableName() + " WHERE _key IN (");
boolean first = true;
for (Object key : keys) {
if (first)
first = false;
else
b.append(", ");
b.append(key);
}
b.append(')');
String qry = b.toString();
SqlFieldsQuery sqlFieldsQry = new SqlFieldsQuery(qry);
if (emulateLongQry)
sqlFieldsQry.setLazy(true).setPageSize(1);
List<List> rows;
try (FieldsQueryCursor<List> cur = cache.query(sqlFieldsQry)) {
if (emulateLongQry) {
rows = new ArrayList<>();
for (List row : cur) {
rows.add(row);
doSleep(ThreadLocalRandom.current().nextInt(50));
}
}
else
rows = cur.getAll();
}
if (rows.isEmpty())
return Collections.emptyMap();
res = new HashMap();
for (List row : rows)
res.put(row.get(0), codec.decode(row));
return res;
case SQL_SUM:
b = new StringBuilder("SELECT SUM(" + codec.aggregateColumnName() + ") FROM " + codec.tableName() + " WHERE _key IN (");
first = true;
for (Object key : keys) {
if (first)
first = false;
else
b.append(", ");
b.append(key);
}
b.append(')');
qry = b.toString();
FieldsQueryCursor<List> cur = cache.query(new SqlFieldsQuery(qry));
rows = cur.getAll();
if (rows.isEmpty())
return Collections.emptyMap();
res = new HashMap();
for (List row : rows)
res.put(row.get(0), row.get(0));
return res;
default:
throw new AssertionError("Unsupported read mode: " + readMode);
}
}
/**
* Writes all entries using given write mode.
*
* @param cache Cache.
* @param entries Entries to write.
* @param writeMode Write mode.
* @param codec Entry codec.
*/
@SuppressWarnings("unchecked")
protected void writeAllByMode(IgniteCache cache, final Map entries, WriteMode writeMode, ObjectCodec codec) {
assert cache != null && entries != null && writeMode != null;
assert writeMode != DML || codec != null;
switch (writeMode) {
case PUT:
cache.putAll(entries);
return;
case DML:
StringBuilder b = new StringBuilder("MERGE INTO " + codec.tableName() + " (" + codec.columnsNames() + ") VALUES ");
boolean first = true;
for (Object entry : entries.entrySet()) {
Map.Entry e = (Map.Entry)entry;
if (first)
first = false;
else
b.append(", ");
b.append('(')
.append(e.getKey())
.append(", ")
.append(codec.encode(e.getValue()))
.append(')');
}
String qry = b.toString();
cache.query(new SqlFieldsQuery(qry)).getAll();
return;
default:
throw new AssertionError("Unsupported write mode: " + writeMode);
}
}
/**
* Object codec for SQL queries.
*
* @param <T> Type.
*/
private interface ObjectCodec<T> {
/**
* Decodes object from SQL request result.
*
* @param row SQL request result.
* @return Decoded object.
*/
T decode(List<?> row);
/**
* Encodes object into SQL string for INSERT clause.
*
* @param obj Object.
* @return Sql string.
*/
String encode(T obj);
/**
* @return Table name.
*/
String tableName();
/**
* @return Columns names.
*/
String columnsNames();
/**
* @return Column for aggregate functions.
*/
String aggregateColumnName();
}
/**
* Codec for {@code Integer} table.
*/
private static class IntegerCodec implements ObjectCodec<Integer> {
/** {@inheritDoc} */
@Override public Integer decode(List<?> row) {
return (Integer)row.get(1);
}
/** {@inheritDoc} */
@Override public String encode(Integer obj) {
return String.valueOf(obj);
}
/** {@inheritDoc} */
@Override public String tableName() {
return "Integer";
}
/** {@inheritDoc} */
@Override public String columnsNames() {
return "_key, _val";
}
/** {@inheritDoc} */
@Override public String aggregateColumnName() {
return "_val";
}
}
/**
* Codec for {@code MvccTestAccount} table.
*/
private static class AccountCodec implements ObjectCodec<MvccTestAccount> {
/** {@inheritDoc} */
@Override public MvccTestAccount decode(List<?> row) {
Integer val = (Integer)row.get(1);
Integer updateCnt = (Integer)row.get(2);
return new MvccTestAccount(val, updateCnt);
}
/** {@inheritDoc} */
@Override public String encode(MvccTestAccount obj) {
return String.valueOf(obj.val) + ", " + String.valueOf(obj.updateCnt);
}
/** {@inheritDoc} */
@Override public String tableName() {
return "MvccTestAccount";
}
/** {@inheritDoc} */
@Override public String columnsNames() {
return "_key, val, updateCnt";
}
/** {@inheritDoc} */
@Override public String aggregateColumnName() {
return "val";
}
}
/**
* @param caches Caches.
* @param rnd Random.
* @return Random cache.
*/
static <K, V> TestCache<K, V> randomCache(
List<TestCache> caches,
ThreadLocalRandom rnd) {
synchronized (caches) {
if (caches.size() == 1) {
TestCache cache = caches.get(0);
assertTrue(cache.readLock());
return cache;
}
for (;;) {
int idx = rnd.nextInt(caches.size());
TestCache testCache = caches.get(idx);
if (testCache.readLock())
return testCache;
}
}
}
/**
*
*/
static class MvccTestAccount {
/** */
@QuerySqlField(index = false)
final int val;
/** */
@QuerySqlField
final int updateCnt;
/**
* @param val Value.
* @param updateCnt Updates counter.
*/
MvccTestAccount(int val, int updateCnt) {
assert updateCnt > 0;
this.val = val;
this.updateCnt = updateCnt;
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
MvccTestAccount account = (MvccTestAccount)o;
return val == account.val &&
updateCnt == account.updateCnt;
}
/** {@inheritDoc} */
@Override public int hashCode() {
return Objects.hash(val, updateCnt);
}
/** {@inheritDoc} */
@Override public String toString() {
return "MvccTestAccount{" +
"val=" + val +
", updateCnt=" + updateCnt +
'}';
}
}
/**
*
*/
enum ReadMode {
/** */
GET,
/** */
SCAN,
/** */
SQL,
/** */
SQL_SUM,
/** */
INVOKE
}
/**
*
*/
enum WriteMode {
/** */
DML,
/** */
PUT,
/** */
INVOKE
}
/**
*
*/
enum RestartMode {
/**
* Dedicated coordinator node is restarted during test.
*/
RESTART_CRD,
/** */
RESTART_RND_SRV
}
/**
*
*/
static class CoordinatorNodeFilter implements IgnitePredicate<ClusterNode> {
/** {@inheritDoc} */
@Override public boolean apply(ClusterNode node) {
return node.attribute(CRD_ATTR) == null;
}
}
/**
*
*/
static class CoordinatorAssignClosure implements IgniteClosure<Collection<ClusterNode>, ClusterNode> {
/** {@inheritDoc} */
@Override public ClusterNode apply(Collection<ClusterNode> clusterNodes) {
for (ClusterNode node : clusterNodes) {
if (node.attribute(CRD_ATTR) != null) {
assert !node.isClient();
return node;
}
}
return null;
}
}
/**
*
*/
static class TestCache<K, V> {
/** */
final IgniteCache<K, V> cache;
/** Locks node to avoid node restart while test operation is in progress. */
final ReadWriteLock stopLock = new ReentrantReadWriteLock();
/**
* @param cache Cache.
*/
TestCache(IgniteCache cache) {
this.cache = cache;
}
/**
* @return {@code True} if locked.
*/
boolean readLock() {
return stopLock.readLock().tryLock();
}
/**
*
*/
void readUnlock() {
stopLock.readLock().unlock();
}
}
/**
*
*/
static class InitIndexing implements IgniteInClosure<CacheConfiguration> {
/** */
private final Class[] idxTypes;
/**
* @param idxTypes Indexed types.
*/
InitIndexing(Class<?>... idxTypes) {
this.idxTypes = idxTypes;
}
/** {@inheritDoc} */
@Override public void apply(CacheConfiguration cfg) {
cfg.setIndexedTypes(idxTypes);
}
}
/**
* Removed accounts tracker.
*/
private static class RemovedAccountsTracker {
/** */
private final Map<Integer, Integer> rmvdKeys;
/**
* @param size Size.
*/
RemovedAccountsTracker(int size) {
this.rmvdKeys = new HashMap<>(size);
for (int i = 0; i < size; i++)
rmvdKeys.put(i, 0);
}
/**
* @return Size.
*/
public synchronized int size() {
int size = 0;
for (int i = 0; i < rmvdKeys.size(); i++) {
if (rmvdKeys.get(i) > 0)
size++;
}
return size;
}
/**
* @param id Id.
* @return {@code True} if success.
*/
synchronized boolean markRemoved(Integer id) {
Integer rmvdCntr = rmvdKeys.get(id);
Integer newCntr = rmvdCntr + 1;
rmvdKeys.put(id, newCntr);
return newCntr >= 0;
}
/**
* @param id Id.
* @return {@code True} if success.
*/
synchronized boolean unmarkRemoved(Integer id) {
Integer rmvdCntr = rmvdKeys.get(id);
Integer newCntr = rmvdCntr - 1;
rmvdKeys.put(id, newCntr);
return newCntr >= 0;
}
}
}