blob: 5b948288c4b0c5157e0d923044f64f0a271f1612 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.TransactionConfiguration;
import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.junit.Ignore;
import org.junit.Test;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
import static org.apache.ignite.cache.CacheMode.LOCAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
/**
* Test for TRANSFORM events recording.
*/
@SuppressWarnings("ConstantConditions")
public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
/** Nodes count. */
private static final int GRID_CNT = 3;
/** Backups count for partitioned cache. */
private static final int BACKUP_CNT = 1;
/** Cache name. */
private static final String CACHE_NAME = "cache";
/** Key 1. */
private Integer key1;
/** Key 2. */
private Integer key2;
/** Two keys in form of a set. */
private Set<Integer> keys;
/** Nodes. */
private Ignite[] ignites;
/** Node IDs. */
private UUID[] ids;
/** Caches. */
private IgniteCache<Integer, Integer>[] caches;
/** Recorded events. */
private GridConcurrentHashSet<CacheEvent> evts;
/** Cache mode. */
private CacheMode cacheMode;
/** Atomicity mode. */
private CacheAtomicityMode atomicityMode;
/** TX concurrency. */
private TransactionConcurrency txConcurrency;
/** TX isolation. */
private TransactionIsolation txIsolation;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
TransactionConfiguration tCfg = cfg.getTransactionConfiguration();
tCfg.setDefaultTxConcurrency(txConcurrency);
tCfg.setDefaultTxIsolation(txIsolation);
CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
ccfg.setName(CACHE_NAME);
ccfg.setCacheMode(cacheMode);
ccfg.setAtomicityMode(atomicityMode);
ccfg.setWriteSynchronizationMode(FULL_SYNC);
if (cacheMode == PARTITIONED)
ccfg.setBackups(BACKUP_CNT);
cfg.setCacheConfiguration(ccfg);
cfg.setLocalHost("127.0.0.1");
cfg.setIncludeEventTypes(EVT_CACHE_OBJECT_READ);
return cfg;
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
ignites = null;
ids = null;
caches = null;
evts = null;
key1 = null;
key2 = null;
keys = null;
}
/**
* Initialization routine.
*
* @param cacheMode Cache mode.
* @param atomicityMode Atomicity mode.
* @param txConcurrency TX concurrency.
* @param txIsolation TX isolation.
* @throws Exception If failed.
*/
@SuppressWarnings("unchecked")
private void initialize(CacheMode cacheMode, CacheAtomicityMode atomicityMode,
TransactionConcurrency txConcurrency, TransactionIsolation txIsolation) throws Exception {
this.cacheMode = cacheMode;
this.atomicityMode = atomicityMode;
this.txConcurrency = txConcurrency;
this.txIsolation = txIsolation;
evts = new GridConcurrentHashSet<>();
startGridsMultiThreaded(GRID_CNT, true);
if (cacheMode == REPLICATED)
awaitPartitionMapExchange();
ignites = new Ignite[GRID_CNT];
ids = new UUID[GRID_CNT];
caches = new IgniteCache[GRID_CNT];
for (int i = 0; i < GRID_CNT; i++) {
ignites[i] = grid(i);
ids[i] = ignites[i].cluster().localNode().id();
caches[i] = ignites[i].cache(CACHE_NAME);
ignites[i].events().localListen(new IgnitePredicate<Event>() {
@Override public boolean apply(Event evt) {
CacheEvent evt0 = (CacheEvent)evt;
if (evt0.closureClassName() != null) {
System.out.println("ADDED: [nodeId=" + evt0.node() + ", evt=" + evt0 + ']');
evts.add(evt0);
}
return true;
}
}, EVT_CACHE_OBJECT_READ);
}
int key = 0;
while (true) {
if (cacheMode != PARTITIONED || (primary(0, key) && backup(1, key))) {
key1 = key++;
break;
}
else
key++;
}
while (true) {
if (cacheMode != PARTITIONED || (primary(0, key) && backup(1, key))) {
key2 = key;
break;
}
else
key++;
}
keys = new TreeSet<>();
keys.add(key1);
keys.add(key2);
caches[0].put(key1, 1);
caches[0].put(key2, 2);
for (int i = 0; i < GRID_CNT; i++) {
ignites[i].events().localListen(new IgnitePredicate<Event>() {
@Override public boolean apply(Event evt) {
CacheEvent evt0 = (CacheEvent)evt;
if (evt0.closureClassName() != null)
evts.add(evt0);
return true;
}
}, EVT_CACHE_OBJECT_READ);
}
}
/**
* @param gridIdx Grid index.
* @param key Key.
* @return {@code True} if grid is primary for given key.
*/
private boolean primary(int gridIdx, Object key) {
Affinity<Object> aff = grid(0).affinity(CACHE_NAME);
return aff.isPrimary(grid(gridIdx).cluster().localNode(), key);
}
/**
* @param gridIdx Grid index.
* @param key Key.
* @return {@code True} if grid is primary for given key.
*/
private boolean backup(int gridIdx, Object key) {
Affinity<Object> aff = grid(0).affinity(CACHE_NAME);
return aff.isBackup(grid(gridIdx).cluster().localNode(), key);
}
/**
* Test TRANSACTIONAL LOCAL cache with OPTIMISTIC/REPEATABLE_READ transaction.
*
* @throws Exception If failed.
*/
@Test
public void testTxLocalOptimisticRepeatableRead() throws Exception {
checkTx(LOCAL, OPTIMISTIC, REPEATABLE_READ);
}
/**
* Test TRANSACTIONAL LOCAL cache with OPTIMISTIC/READ_COMMITTED transaction.
*
* @throws Exception If failed.
*/
@Test
public void testTxLocalOptimisticReadCommitted() throws Exception {
checkTx(LOCAL, OPTIMISTIC, READ_COMMITTED);
}
/**
* Test TRANSACTIONAL LOCAL cache with OPTIMISTIC/SERIALIZABLE transaction.
*
* @throws Exception If failed.
*/
@Test
public void testTxLocalOptimisticSerializable() throws Exception {
checkTx(LOCAL, OPTIMISTIC, SERIALIZABLE);
}
/**
* Test TRANSACTIONAL LOCAL cache with PESSIMISTIC/REPEATABLE_READ transaction.
*
* @throws Exception If failed.
*/
@Test
public void testTxLocalPessimisticRepeatableRead() throws Exception {
checkTx(LOCAL, PESSIMISTIC, REPEATABLE_READ);
}
/**
* Test TRANSACTIONAL LOCAL cache with PESSIMISTIC/READ_COMMITTED transaction.
*
* @throws Exception If failed.
*/
@Test
public void testTxLocalPessimisticReadCommitted() throws Exception {
checkTx(LOCAL, PESSIMISTIC, READ_COMMITTED);
}
/**
* Test TRANSACTIONAL LOCAL cache with PESSIMISTIC/SERIALIZABLE transaction.
*
* @throws Exception If failed.
*/
@Test
public void testTxLocalPessimisticSerializable() throws Exception {
checkTx(LOCAL, PESSIMISTIC, SERIALIZABLE);
}
/**
* Test TRANSACTIONAL_SNAPSHOT LOCAL cache with PESSIMISTIC/REPEATABLE_READ transaction.
*
* @throws Exception If failed.
*/
@Ignore("https://issues.apache.org/jira/browse/IGNITE-9530")
@Test
public void testMvccTxLocalPessimisticRepeatableRead() throws Exception {
checkMvccTx(LOCAL, PESSIMISTIC, REPEATABLE_READ);
}
/**
* Test TRANSACTIONAL PARTITIONED cache with OPTIMISTIC/REPEATABLE_READ transaction.
*
* @throws Exception If failed.
*/
@Test
public void testTxPartitionedOptimisticRepeatableRead() throws Exception {
checkTx(PARTITIONED, OPTIMISTIC, REPEATABLE_READ);
}
/**
* Test TRANSACTIONAL PARTITIONED cache with OPTIMISTIC/READ_COMMITTED transaction.
*
* @throws Exception If failed.
*/
@Test
public void testTxPartitionedOptimisticReadCommitted() throws Exception {
checkTx(PARTITIONED, OPTIMISTIC, READ_COMMITTED);
}
/**
* Test TRANSACTIONAL PARTITIONED cache with OPTIMISTIC/SERIALIZABLE transaction.
*
* @throws Exception If failed.
*/
@Test
public void testTxPartitionedOptimisticSerializable() throws Exception {
checkTx(PARTITIONED, OPTIMISTIC, SERIALIZABLE);
}
/**
* Test TRANSACTIONAL PARTITIONED cache with PESSIMISTIC/REPEATABLE_READ transaction.
*
* @throws Exception If failed.
*/
@Test
public void testTxPartitionedPessimisticRepeatableRead() throws Exception {
checkTx(PARTITIONED, PESSIMISTIC, REPEATABLE_READ);
}
/**
* Test TRANSACTIONAL PARTITIONED cache with PESSIMISTIC/READ_COMMITTED transaction.
*
* @throws Exception If failed.
*/
@Test
public void testTxPartitionedPessimisticReadCommitted() throws Exception {
checkTx(PARTITIONED, PESSIMISTIC, READ_COMMITTED);
}
/**
* Test TRANSACTIONAL PARTITIONED cache with PESSIMISTIC/SERIALIZABLE transaction.
*
* @throws Exception If failed.
*/
@Test
public void testTxPartitionedPessimisticSerializable() throws Exception {
checkTx(PARTITIONED, PESSIMISTIC, SERIALIZABLE);
}
/**
* Test TRANSACTIONAL_SNAPSHOT PARTITIONED cache with PESSIMISTIC/REPEATABLE_READ transaction.
*
* @throws Exception If failed.
*/
@Ignore("https://issues.apache.org/jira/browse/IGNITE-9321")
@Test
public void testMvccTxPartitionedPessimisticRepeatableRead() throws Exception {
checkMvccTx(PARTITIONED, PESSIMISTIC, REPEATABLE_READ);
}
/**
* Test TRANSACTIONAL REPLICATED cache with OPTIMISTIC/REPEATABLE_READ transaction.
*
* @throws Exception If failed.
*/
@Test
public void testTxReplicatedOptimisticRepeatableRead() throws Exception {
checkTx(REPLICATED, OPTIMISTIC, REPEATABLE_READ);
}
/**
* Test TRANSACTIONAL REPLICATED cache with OPTIMISTIC/READ_COMMITTED transaction.
*
* @throws Exception If failed.
*/
@Test
public void testTxReplicatedOptimisticReadCommitted() throws Exception {
checkTx(REPLICATED, OPTIMISTIC, READ_COMMITTED);
}
/**
* Test TRANSACTIONAL REPLICATED cache with OPTIMISTIC/SERIALIZABLE transaction.
*
* @throws Exception If failed.
*/
@Test
public void testTxReplicatedOptimisticSerializable() throws Exception {
checkTx(REPLICATED, OPTIMISTIC, SERIALIZABLE);
}
/**
* Test TRANSACTIONAL REPLICATED cache with PESSIMISTIC/REPEATABLE_READ transaction.
*
* @throws Exception If failed.
*/
@Test
public void testTxReplicatedPessimisticRepeatableRead() throws Exception {
checkTx(REPLICATED, PESSIMISTIC, REPEATABLE_READ);
}
/**
* Test TRANSACTIONAL REPLICATED cache with PESSIMISTIC/READ_COMMITTED transaction.
*
* @throws Exception If failed.
*/
@Test
public void testTxReplicatedPessimisticReadCommitted() throws Exception {
checkTx(REPLICATED, PESSIMISTIC, READ_COMMITTED);
}
/**
* Test TRANSACTIONAL REPLICATED cache with PESSIMISTIC/SERIALIZABLE transaction.
*
* @throws Exception If failed.
*/
@Test
public void testTxReplicatedPessimisticSerializable() throws Exception {
checkTx(REPLICATED, PESSIMISTIC, SERIALIZABLE);
}
/**
* Test TRANSACTIONAL_SNAPSHOT REPLICATED cache with PESSIMISTIC/REPEATABLE_READ transaction.
*
* @throws Exception If failed.
*/
@Ignore("https://issues.apache.org/jira/browse/IGNITE-9321")
@Test
public void testMvccTxReplicatedPessimisticRepeatableRead() throws Exception {
checkMvccTx(REPLICATED, PESSIMISTIC, REPEATABLE_READ);
}
/**
* Test ATOMIC LOCAL cache.
*
* @throws Exception If failed.
*/
@Test
public void testAtomicLocal() throws Exception {
checkAtomic(LOCAL);
}
/**
* Test ATOMIC PARTITIONED cache.
*
* @throws Exception If failed.
*/
@Test
public void testAtomicPartitioned() throws Exception {
checkAtomic(PARTITIONED);
}
/**
* Test ATOMIC REPLICATED cache.
*
* @throws Exception If failed.
*/
@Test
public void testAtomicReplicated() throws Exception {
checkAtomic(REPLICATED);
}
/**
* Check ATOMIC cache.
*
* @param cacheMode Cache mode.
* @throws Exception If failed.
*/
private void checkAtomic(CacheMode cacheMode) throws Exception {
initialize(cacheMode, ATOMIC, null, null);
caches[0].invoke(key1, new Transformer());
checkEventNodeIdsStrict(Transformer.class.getName(), primaryIdsForKeys(key1));
assert evts.isEmpty();
caches[0].invokeAll(keys, new Transformer());
checkEventNodeIdsStrict(Transformer.class.getName(), primaryIdsForKeys(key1, key2));
assert evts.isEmpty();
caches[0].invoke(key1, new TransformerWithInjection());
checkEventNodeIdsStrict(TransformerWithInjection.class.getName(), primaryIdsForKeys(key1));
assert evts.isEmpty();
caches[0].invokeAll(keys, new TransformerWithInjection());
checkEventNodeIdsStrict(TransformerWithInjection.class.getName(), primaryIdsForKeys(key1, key2));
}
/**
* Check TRANSACTIONAL_SNAPSHOT cache.
*
* @param cacheMode Cache mode.
* @param txConcurrency TX concurrency.
* @param txIsolation TX isolation.
* @throws Exception If failed.
*/
private void checkMvccTx(CacheMode cacheMode, TransactionConcurrency txConcurrency,
TransactionIsolation txIsolation) throws Exception {
initialize(cacheMode, TRANSACTIONAL_SNAPSHOT, txConcurrency, txIsolation);
checkTx0();
}
/**
* Check TRANSACTIONAL cache.
*
* @param cacheMode Cache mode.
* @param txConcurrency TX concurrency.
* @param txIsolation TX isolation.
* @throws Exception If failed.
*/
private void checkTx(CacheMode cacheMode, TransactionConcurrency txConcurrency,
TransactionIsolation txIsolation) throws Exception {
initialize(cacheMode, TRANSACTIONAL, txConcurrency, txIsolation);
checkTx0();
}
/**
* Check TX cache.
*/
private void checkTx0() {
System.out.println("BEFORE: " + evts.size());
caches[0].invoke(key1, new Transformer());
System.out.println("AFTER: " + evts.size());
checkEventNodeIdsStrict(Transformer.class.getName(), idsForKeys(key1));
assert evts.isEmpty();
caches[0].invokeAll(keys, new Transformer());
checkEventNodeIdsStrict(Transformer.class.getName(), idsForKeys(key1, key2));
assert evts.isEmpty();
System.out.println("BEFORE: " + evts.size());
caches[0].invoke(key1, new TransformerWithInjection());
System.out.println("AFTER: " + evts.size());
checkEventNodeIdsStrict(TransformerWithInjection.class.getName(), idsForKeys(key1));
assert evts.isEmpty();
caches[0].invokeAll(keys, new TransformerWithInjection());
checkEventNodeIdsStrict(TransformerWithInjection.class.getName(), idsForKeys(key1, key2));
}
/**
* Get node IDs where the given keys must reside.
*
* @param keys Keys.
* @return Node IDs.
*/
private UUID[] idsForKeys(int... keys) {
return idsForKeys(false, keys);
}
/**
* Get primary node IDs where the given keys must reside.
*
* @param keys Keys.
* @return Node IDs.
*/
private UUID[] primaryIdsForKeys(int... keys) {
return idsForKeys(true, keys);
}
/**
* Get node IDs where the given keys must reside.
*
* @param primaryOnly Primary only flag.
* @param keys Keys.
* @return Node IDs.
*/
private UUID[] idsForKeys(boolean primaryOnly, int... keys) {
List<UUID> res = new ArrayList<>();
if (cacheMode == LOCAL) {
for (int key : keys)
res.add(ids[0]); // Perform PUTs from the node with index 0.
}
else if (cacheMode == PARTITIONED) {
for (int key : keys) {
for (int i = 0; i < GRID_CNT; i++) {
if (primary(i, key) || (!primaryOnly && backup(i, key)))
res.add(ids[i]);
}
}
}
else if (cacheMode == REPLICATED) {
for (int key : keys) {
if (primaryOnly)
res.add(grid(0).affinity(CACHE_NAME).mapKeyToNode(key).id());
else
res.addAll(Arrays.asList(ids));
}
}
return res.toArray(new UUID[res.size()]);
}
/**
* Ensure that events were recorded on the given nodes.
*
* @param cClsName Entry processor class name.
* @param ids Event IDs.
*/
private void checkEventNodeIdsStrict(String cClsName, UUID... ids) {
if (ids == null)
assertTrue(evts.isEmpty());
else {
assertEquals(ids.length, evts.size());
for (UUID id : ids) {
CacheEvent foundEvt = null;
for (CacheEvent evt : evts) {
if (F.eq(id, evt.node().id())) {
assertEquals(cClsName, evt.closureClassName());
foundEvt = evt;
break;
}
}
if (foundEvt == null) {
int gridIdx = -1;
for (int i = 0; i < GRID_CNT; i++) {
if (F.eq(this.ids[i], id)) {
gridIdx = i;
break;
}
}
fail("Expected transform event was not triggered on the node [nodeId=" + id +
", key1Primary=" + primary(gridIdx, key1) + ", key1Backup=" + backup(gridIdx, key1) +
", key2Primary=" + primary(gridIdx, key2) + ", key2Backup=" + backup(gridIdx, key2) + ']');
}
else
evts.remove(foundEvt);
}
}
}
/**
* Transform closure.
*/
private static class Transformer implements EntryProcessor<Integer, Integer, Void>, Serializable {
/** {@inheritDoc} */
@Override public Void process(MutableEntry<Integer, Integer> e, Object... args) {
e.setValue(e.getValue() + 1);
return null;
}
}
/**
* Transform closure.
*/
private static class TransformerWithInjection implements EntryProcessor<Integer, Integer, Void>, Serializable {
/** */
@IgniteInstanceResource
private transient Ignite ignite;
/** {@inheritDoc} */
@Override public Void process(MutableEntry<Integer, Integer> e, Object... args) {
assert ignite != null;
e.setValue(e.getValue() + 1);
return null;
}
}
}