blob: e5abbd84e5d69f4b661561a069da997e9bb49989 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.mvcc;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.EntryProcessorResult;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.junit.Test;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.GET;
import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.INVOKE;
import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SQL;
import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.WriteMode.DML;
import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.WriteMode.PUT;
/**
* Test basic mvcc bulk cache operations.
*/
public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
/** {@inheritDoc} */
@Override protected CacheMode cacheMode() {
return CacheMode.PARTITIONED;
}
/** */
private int nodesCount() {
return 4;
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
cleanPersistenceDir();
startGridsMultiThreaded(nodesCount() - 1);
client = true;
startGrid(nodesCount() - 1);
}
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
stopAllGrids();
cleanPersistenceDir();
super.afterTestsStopped();
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
grid(0).createCache(cacheConfiguration(cacheMode(), FULL_SYNC, 1, 32).
setIndexedTypes(Integer.class, MvccTestAccount.class));
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
grid(0).destroyCache(DEFAULT_CACHE_NAME);
}
/**
* @throws Exception If failed.
*/
@Test
public void testRepeatableReadIsolationGetPut() throws Exception {
checkOperations(GET, GET, PUT, true);
checkOperations(GET, GET, PUT, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testRepeatableReadIsolationInvoke() throws Exception {
checkOperations(GET, GET, WriteMode.INVOKE, true);
checkOperations(GET, GET, WriteMode.INVOKE, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testRepeatableReadIsolationSqlPut() throws Exception {
checkOperations(SQL, SQL, PUT, true);
checkOperations(SQL, SQL, PUT, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testRepeatableReadIsolationSqlInvoke() throws Exception {
checkOperations(SQL, SQL, WriteMode.INVOKE, true);
checkOperations(SQL, SQL, WriteMode.INVOKE, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testRepeatableReadIsolationSqlDml() throws Exception {
checkOperations(SQL, SQL, DML, true);
checkOperations(SQL, SQL, DML, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testRepeatableReadIsolationGetDml() throws Exception {
checkOperations(GET, GET, DML, true);
checkOperations(GET, GET, DML, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testRepeatableReadIsolationMixedPut() throws Exception {
checkOperations(SQL, GET, PUT, false);
checkOperations(SQL, GET, PUT, true);
checkOperations(SQL, GET, WriteMode.INVOKE, false);
checkOperations(SQL, GET, WriteMode.INVOKE, true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testRepeatableReadIsolationMixedPut2() throws Exception {
checkOperations(GET, SQL, PUT, false);
checkOperations(GET, SQL, PUT, true);
checkOperations(GET, SQL, WriteMode.INVOKE, false);
checkOperations(GET, SQL, WriteMode.INVOKE, true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testRepeatableReadIsolationMixedDml() throws Exception {
checkOperations(SQL, GET, DML, false);
checkOperations(SQL, GET, DML, true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testRepeatableReadIsolationMixedDml2() throws Exception {
checkOperations(GET, SQL, DML, false);
checkOperations(GET, SQL, DML, true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testOperationConsistency() throws Exception {
checkOperationsConsistency(PUT, false);
checkOperationsConsistency(DML, false);
checkOperationsConsistency(WriteMode.INVOKE, false);
checkOperationsConsistency(PUT, true);
checkOperationsConsistency(DML, true);
checkOperationsConsistency(WriteMode.INVOKE, true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testInvokeConsistency() throws Exception {
Ignite node = grid(/*requestFromClient ? nodesCount() - 1 :*/ 0);
TestCache<Integer, MvccTestAccount> cache = new TestCache<>(node.cache(DEFAULT_CACHE_NAME));
final Set<Integer> keys1 = new HashSet<>(3);
final Set<Integer> keys2 = new HashSet<>(3);
Set<Integer> allKeys = generateKeySet(cache.cache, keys1, keys2);
final Map<Integer, MvccTestAccount> map1 = keys1.stream().collect(
Collectors.toMap(k -> k, k -> new MvccTestAccount(k, 1)));
final Map<Integer, MvccTestAccount> map2 = keys2.stream().collect(
Collectors.toMap(k -> k, k -> new MvccTestAccount(k, 1)));
assertEquals(0, cache.cache.size());
updateEntries(cache, map1, WriteMode.INVOKE);
assertEquals(3, cache.cache.size());
updateEntries(cache, map1, WriteMode.INVOKE);
assertEquals(3, cache.cache.size());
getEntries(cache, allKeys, INVOKE);
assertEquals(3, cache.cache.size());
updateEntries(cache, map2, WriteMode.INVOKE);
assertEquals(6, cache.cache.size());
getEntries(cache, keys2, INVOKE);
assertEquals(6, cache.cache.size());
removeEntries(cache, keys1, WriteMode.INVOKE);
assertEquals(3, cache.cache.size());
removeEntries(cache, keys1, WriteMode.INVOKE);
assertEquals(3, cache.cache.size());
getEntries(cache, allKeys, INVOKE);
assertEquals(3, cache.cache.size());
updateEntries(cache, map1, WriteMode.INVOKE);
assertEquals(6, cache.cache.size());
removeEntries(cache, allKeys, WriteMode.INVOKE);
assertEquals(0, cache.cache.size());
getEntries(cache, allKeys, INVOKE);
assertEquals(0, cache.cache.size());
}
/**
* Checks SQL and CacheAPI operation isolation consistency.
*
* @param readModeBefore read mode used before value updated.
* @param readModeAfter read mode used after value updated.
* @param writeMode write mode used for update.
* @throws Exception If failed.
*/
private void checkOperations(ReadMode readModeBefore, ReadMode readModeAfter,
WriteMode writeMode, boolean readFromClient) throws Exception {
Ignite node1 = grid(readFromClient ? nodesCount() - 1 : 0);
Ignite node2 = grid(readFromClient ? 0 : nodesCount() - 1);
TestCache<Integer, MvccTestAccount> cache1 = new TestCache<>(node1.cache(DEFAULT_CACHE_NAME));
TestCache<Integer, MvccTestAccount> cache2 = new TestCache<>(node2.cache(DEFAULT_CACHE_NAME));
final Set<Integer> keysForUpdate = new HashSet<>(3);
final Set<Integer> keysForRemove = new HashSet<>(3);
final Set<Integer> allKeys = generateKeySet(grid(0).cache(DEFAULT_CACHE_NAME), keysForUpdate, keysForRemove);
final Map<Integer, MvccTestAccount> initialMap = allKeys.stream().collect(
Collectors.toMap(k -> k, k -> new MvccTestAccount(k, 1)));
final Map<Integer, MvccTestAccount> updateMap = keysForUpdate.stream().collect(Collectors.toMap(Function.identity(),
k -> new MvccTestAccount(k, 2))); /* Removed keys are excluded. */
cache1.cache.putAll(initialMap);
IgniteTransactions txs1 = node1.transactions();
IgniteTransactions txs2 = node2.transactions();
CountDownLatch updateStart = new CountDownLatch(1);
CountDownLatch updateFinish = new CountDownLatch(1);
// Start concurrent transactions and check isolation.
IgniteInternalFuture<Void> updater = GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
updateStart.await();
assertEquals(initialMap.size(), cache2.cache.size());
try (Transaction tx = txs2.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
tx.timeout(TX_TIMEOUT);
updateEntries(cache2, updateMap, writeMode);
removeEntries(cache2, keysForRemove, writeMode);
assertEquals(updateMap, cache2.cache.getAll(allKeys));
tx.commit();
}
finally {
updateFinish.countDown();
}
assertEquals(updateMap.size(), cache2.cache.size());
return null;
}
});
IgniteInternalFuture<Void> reader = GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
try (Transaction tx = txs1.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
assertEquals(initialMap, getEntries(cache1, allKeys, readModeBefore));
checkContains(cache1, true, allKeys);
updateStart.countDown();
updateFinish.await();
assertEquals(initialMap, getEntries(cache1, allKeys, readModeAfter));
checkContains(cache1, true,allKeys);
tx.commit();
}
return null;
}
});
try {
updater.get(3_000, TimeUnit.MILLISECONDS);
reader.get(3_000, TimeUnit.MILLISECONDS);
}
catch (Throwable e) {
throw new AssertionError(e);
}
finally {
updateStart.countDown();
updateFinish.countDown();
}
assertEquals(updateMap, cache1.cache.getAll(allKeys));
}
/**
* Generate 2 sets of keys. Each set contains primary, backup and non-affinity key for given node cache.
*
* @param cache Cache.
* @param keySet1 Key set.
* @param keySet2 Key set.
* @return All keys.
* @throws IgniteCheckedException If failed.
*/
protected Set<Integer> generateKeySet(IgniteCache<?, ?> cache, Set<Integer> keySet1,
Set<Integer> keySet2) throws IgniteCheckedException {
LinkedHashSet<Integer> allKeys = new LinkedHashSet<>();
allKeys.addAll(primaryKeys(cache, 2));
allKeys.addAll(backupKeys(cache, 2, 1));
allKeys.addAll(nearKeys(cache, 2, 1));
List<Integer> keys0 = new ArrayList<>(allKeys);
for (int i = 0; i < 6; i++) {
if (i % 2 == 0)
keySet1.add(keys0.get(i));
else
keySet2.add(keys0.get(i));
}
assert allKeys.size() == 6; // Expects no duplicates.
return allKeys;
}
/**
* Checks SQL and CacheAPI operation see consistent results before and after update.
*
* @throws Exception If failed.
*/
private void checkOperationsConsistency(WriteMode writeMode, boolean requestFromClient) throws Exception {
Ignite node = grid(requestFromClient ? nodesCount() - 1 : 0);
TestCache<Integer, MvccTestAccount> cache = new TestCache<>(node.cache(DEFAULT_CACHE_NAME));
final Set<Integer> keysForUpdate = new HashSet<>(3);
final Set<Integer> keysForRemove = new HashSet<>(3);
final Set<Integer> allKeys = generateKeySet(grid(0).cache(DEFAULT_CACHE_NAME), keysForUpdate, keysForRemove);
try {
int updCnt = 1;
final Map<Integer, MvccTestAccount> initialVals = allKeys.stream().collect(
Collectors.toMap(k -> k, k -> new MvccTestAccount(k, 1)));
updateEntries(cache, initialVals, writeMode);
assertEquals(initialVals.size(), cache.cache.size());
IgniteTransactions txs = node.transactions();
Map<Integer, MvccTestAccount> updatedVals = null;
try (Transaction tx = txs.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
Map<Integer, MvccTestAccount> vals1 = getEntries(cache, allKeys, GET);
Map<Integer, MvccTestAccount> vals2 = getEntries(cache, allKeys, SQL);
Map<Integer, MvccTestAccount> vals3 = getEntries(cache, allKeys, ReadMode.INVOKE);
assertEquals(initialVals, vals1);
assertEquals(initialVals, vals2);
assertEquals(initialVals, vals3);
assertEquals(initialVals.size(), cache.cache.size());
for (ReadMode readMode : new ReadMode[] {GET, SQL, INVOKE}) {
int updCnt0 = ++updCnt;
updatedVals = allKeys.stream().collect(Collectors.toMap(Function.identity(),
k -> new MvccTestAccount(k, updCnt0)));
updateEntries(cache, updatedVals, writeMode);
assertEquals(allKeys.size(), cache.cache.size());
removeEntries(cache, keysForRemove, writeMode);
for (Integer key : keysForRemove)
updatedVals.remove(key);
assertEquals(String.valueOf(readMode), updatedVals, getEntries(cache, allKeys, readMode));
}
tx.commit();
}
try (Transaction tx = txs.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
assertEquals(updatedVals, getEntries(cache, allKeys, GET));
assertEquals(updatedVals, getEntries(cache, allKeys, SQL));
assertEquals(updatedVals, getEntries(cache, allKeys, INVOKE));
tx.commit();
}
assertEquals(updatedVals.size(), cache.cache.size());
}
finally {
cache.cache.removeAll(keysForUpdate);
}
assertEquals(0, cache.cache.size());
}
/**
* Gets values with given read mode.
*
* @param cache Cache.
* @param keys Key to be read.
* @param readMode Read mode.
* @return Key-value result map.
*/
protected Map<Integer, MvccTestAccount> getEntries(
TestCache<Integer, MvccTestAccount> cache,
Set<Integer> keys,
ReadMode readMode) {
switch (readMode) {
case GET:
return cache.cache.getAll(keys);
case SQL:
return getAllSql(cache);
case INVOKE: {
Map<Integer, MvccTestAccount> res = new HashMap<>();
CacheEntryProcessor<Integer, MvccTestAccount, MvccTestAccount> ep = new GetEntryProcessor<>();
Map<Integer, EntryProcessorResult<MvccTestAccount>> invokeRes = cache.cache.invokeAll(keys, ep);
for (Map.Entry<Integer, EntryProcessorResult<MvccTestAccount>> e : invokeRes.entrySet())
res.put(e.getKey(), e.getValue().get());
return res;
}
default:
fail();
}
return null;
}
/**
* Updates entries with given write mode.
*
* @param cache Cache.
* @param entries Entries to be updated.
* @param writeMode Write mode.
*/
protected void updateEntries(
TestCache<Integer, MvccTestAccount> cache,
Map<Integer, MvccTestAccount> entries,
WriteMode writeMode) {
switch (writeMode) {
case PUT: {
cache.cache.putAll(entries);
break;
}
case DML: {
for (Map.Entry<Integer, MvccTestAccount> e : entries.entrySet())
mergeSql(cache, e.getKey(), e.getValue().val, e.getValue().updateCnt);
break;
}
case INVOKE: {
CacheEntryProcessor<Integer, MvccTestAccount, MvccTestAccount> ep =
new GetAndPutEntryProcessor<Integer, MvccTestAccount>() {
/** {@inheritDoc} */
@Override MvccTestAccount newValForKey(Integer key) {
return entries.get(key);
}
};
cache.cache.invokeAll(entries.keySet(), ep);
break;
}
default:
fail();
}
}
/**
* Updates entries with given write mode.
*
* @param cache Cache.
* @param keys Key to be deleted.
* @param writeMode Write mode.
*/
protected void removeEntries(
TestCache<Integer, MvccTestAccount> cache,
Set<Integer> keys,
WriteMode writeMode) {
switch (writeMode) {
case PUT: {
cache.cache.removeAll(keys);
break;
}
case DML: {
for (Integer key : keys)
removeSql(cache, key);
break;
}
case INVOKE: {
CacheEntryProcessor<Integer, MvccTestAccount, MvccTestAccount> ep = new RemoveEntryProcessor<>();
cache.cache.invokeAll(keys, ep);
break;
}
default:
fail();
}
}
/**
* Check cache contains entries.
*
* @param cache Cache.
* @param expected Expected result.
* @param keys Keys to check.
*/
protected void checkContains(TestCache<Integer, MvccTestAccount> cache, boolean expected, Set<Integer> keys) {
assertEquals(expected, cache.cache.containsKeys(keys));
}
/**
* Applies get operation.
*/
static class GetEntryProcessor<K, V> implements CacheEntryProcessor<K, V, V> {
/** {@inheritDoc} */
@Override public V process(MutableEntry<K, V> entry,
Object... arguments) throws EntryProcessorException {
return entry.getValue();
}
}
/**
* Applies remove operation.
*/
static class RemoveEntryProcessor<K, V, R> implements CacheEntryProcessor<K, V, R> {
/** {@inheritDoc} */
@Override public R process(MutableEntry<K, V> entry,
Object... arguments) throws EntryProcessorException {
entry.remove();
return null;
}
}
/**
* Applies get and put operation.
*/
static class GetAndPutEntryProcessor<K, V> implements CacheEntryProcessor<K, V, V> {
/** {@inheritDoc} */
@Override public V process(MutableEntry<K, V> entry,
Object... args) throws EntryProcessorException {
V newVal = !F.isEmpty(args) ? (V)args[0] : newValForKey(entry.getKey());
V oldVal = entry.getValue();
entry.setValue(newVal);
return oldVal;
}
/**
* @param key Key.
* @return New value.
*/
V newValForKey(K key) {
throw new UnsupportedOperationException();
}
}
}