blob: 6dcc9dbd7ee15d15addf03f55e80955f8050fb90 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.testframework.MvccFeatureChecker;
import org.apache.ignite.transactions.Transaction;
import org.junit.Before;
import org.junit.Test;
import static org.apache.ignite.events.EventType.EVTS_CACHE;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
/**
* Tests events.
*/
public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTest {
/** */
private static final boolean TEST_INFO = true;
/** Wait timeout. */
private static final long WAIT_TIMEOUT = 5000;
/** Key. */
private static final String KEY = "key";
/** */
private static volatile int gridCnt;
/** Event listener. */
protected static volatile EventListener evtLsnr;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName).setIncludeEventTypes(EventType.EVTS_ALL);
}
/**
* @return {@code True} if partitioned.
*/
protected boolean partitioned() {
return false;
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
gridCnt = gridCount();
evtLsnr = createEventListener();
for (int i = 0; i < gridCnt; i++)
grid(i).events().localListen(evtLsnr, EVTS_CACHE);
}
/** */
@Before
public void beforeGridCacheEventAbstractTest() {
MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.CACHE_EVENTS);
}
/** {@inheritDoc} */
@Override protected CacheConfiguration cacheConfiguration(String igniteInstanceName) throws Exception {
MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.CACHE_EVENTS);
return super.cacheConfiguration(igniteInstanceName);
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
if (TEST_INFO)
info("Called beforeTest() callback.");
evtLsnr.reset();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
if (TEST_INFO)
info("Called afterTest() callback.");
evtLsnr.stopListen();
try {
super.afterTest();
}
finally {
evtLsnr.listen();
}
}
/**
* Create event listener.
*/
protected EventListener createEventListener() {
return new TestEventListener(partitioned());
}
/**
* Waits for event count on all nodes.
*
* @param gridIdx Grid index.
* @param evtCnts Array of tuples with values: V1 - event type, V2 - expected event count on one node.
* @throws InterruptedException If thread has been interrupted while waiting.
*/
private void waitForEvents(int gridIdx, IgniteBiTuple<Integer, Integer>... evtCnts) throws Exception {
if (!F.isEmpty(evtCnts))
try {
evtLsnr.waitForEventCount(evtCnts);
}
catch (IgniteCheckedException e) {
printEventCounters(gridIdx, evtCnts);
throw e;
}
}
/**
* @param gridIdx Grid index.
* @param expCnts Expected counters
*/
private void printEventCounters(int gridIdx, IgniteBiTuple<Integer, Integer>[] expCnts) {
info("Printing counters [gridIdx=" + gridIdx + ']');
for (IgniteBiTuple<Integer, Integer> t : expCnts) {
Integer evtType = t.get1();
int actCnt = evtLsnr.eventCount(evtType);
info("Event [evtType=" + evtType + ", expCnt=" + t.get2() + ", actCnt=" + actCnt + ']');
}
}
/**
* Clear caches without generating events.
*/
private void clearCaches() {
for (int i = 0; i < gridCnt; i++) {
IgniteCache<String, Integer> cache = jcache(i);
cache.removeAll();
assert cache.localSize() == 0;
}
}
/**
* Runs provided {@link TestCacheRunnable} instance on all caches.
*
* @param run {@link TestCacheRunnable} instance.
* @param evtCnts Expected event counts for each iteration.
* @throws Exception In failed.
*/
@SuppressWarnings({"CaughtExceptionImmediatelyRethrown"})
private void runTest(TestCacheRunnable run, IgniteBiTuple<Integer, Integer>... evtCnts) throws Exception {
for (int i = 0; i < gridCount(); i++) {
info(">>> Running test for grid [idx=" + i + ", igniteInstanceName=" + grid(i).name() +
", id=" + grid(i).localNode().id() + ']');
try {
run.run(jcache(i));
waitForEvents(i, evtCnts);
}
catch (Exception e) { // Leave this catch to be able to set breakpoint.
throw e;
}
finally {
// This call is mainly required to correctly clear event futures.
evtLsnr.reset();
clearCaches();
// This call is required for the second time to reset counters for
// the previous call.
evtLsnr.reset();
}
}
}
/**
* Get key-value pairs.
*
* @param size Pairs count.
* @return Key-value pairs.
*/
private Map<String, Integer> pairs(int size) {
Map<String, Integer> pairs = new HashMap<>(size);
for (int i = 1; i <= size; i++)
pairs.put(KEY + i, i);
return pairs;
}
/**
* @throws Exception If test failed.
*
* Note: test was disabled for REPPLICATED cache case because IGNITE-607.
* This comment should be removed if test passed stably.
*/
@Test
public void testGetPutRemove() throws Exception {
runTest(
new TestCacheRunnable() {
@Override public void run(IgniteCache<String, Integer> cache) throws IgniteCheckedException {
String key = "key";
Integer val = 1;
assert cache.getAndPut(key, val) == null;
assert cache.containsKey(key);
assertEquals(val, cache.get(key));
assertEquals(val, cache.getAndRemove(key));
assert !cache.containsKey(key);
}
},
F.t(EVT_CACHE_OBJECT_PUT, gridCnt),
F.t(EVT_CACHE_OBJECT_READ, 3),
F.t(EVT_CACHE_OBJECT_REMOVED, gridCnt)
);
}
/**
* @throws Exception If test failed.
*/
@Test
public void testGetPutRemoveTx1() throws Exception {
runTest(new TestCacheRunnable() {
@Override public void run(IgniteCache<String, Integer> cache) throws IgniteCheckedException {
Map.Entry<String, Integer> e = F.first(pairs(1).entrySet());
assert e != null;
String key = e.getKey();
Integer val = e.getValue();
try (Transaction tx = cache.unwrap(Ignite.class).transactions().txStart()) {
assert cache.getAndPut(key, val) == null;
assert cache.containsKey(key);
assert val.equals(cache.get(key));
assert val.equals(cache.getAndRemove(key));
assert !cache.containsKey(key);
tx.commit();
}
assert !cache.containsKey(key);
}
}, F.t(EVT_CACHE_OBJECT_REMOVED, gridCnt));
}
/**
* @throws Exception If test failed.
*/
@Test
public void testGetPutRemoveTx2() throws Exception {
runTest(new TestCacheRunnable() {
@Override public void run(IgniteCache<String, Integer> cache) throws IgniteCheckedException {
Map.Entry<String, Integer> e = F.first(pairs(1).entrySet());
assert e != null;
String key = e.getKey();
Integer val = e.getValue();
try (Transaction tx = cache.unwrap(Ignite.class).transactions().txStart()) {
assert cache.getAndPut(key, val) == null;
assert cache.containsKey(key);
assert val.equals(cache.get(key));
assert val.equals(cache.getAndRemove(key));
assert !cache.containsKey(key);
cache.put(key, val);
assert cache.containsKey(key);
tx.commit();
}
assert cache.containsKey(key);
}
}, F.t(EVT_CACHE_OBJECT_PUT, gridCnt));
}
/**
* @throws Exception If test failed.
*
* Note: test was disabled for REPPLICATED cache case because IGNITE-607.
* This comment should be removed if test passed stably.
*/
@Test
public void testGetPutRemoveAsync() throws Exception {
runTest(new TestCacheRunnable() {
@Override public void run(IgniteCache<String, Integer> cache) throws IgniteCheckedException {
Map.Entry<String, Integer> e = F.first(pairs(1).entrySet());
assert e != null;
String key = e.getKey();
Integer val = e.getValue();
assert cache.getAndPutAsync(key, val).get() == null;
assert cache.containsKey(key);
assert val.equals(cache.getAsync(key).get());
assert val.equals(cache.getAndRemoveAsync(key).get());
assert !cache.containsKey(key);
}
}, F.t(EVT_CACHE_OBJECT_PUT, gridCnt), F.t(EVT_CACHE_OBJECT_READ, 3), F.t(EVT_CACHE_OBJECT_REMOVED, gridCnt));
}
/**
* @throws Exception If test failed.
*/
@Test
public void testGetPutRemoveAsyncTx1() throws Exception {
runTest(new TestCacheRunnable() {
@Override public void run(IgniteCache<String, Integer> cache) throws IgniteCheckedException {
Map.Entry<String, Integer> e = F.first(pairs(1).entrySet());
assert e != null;
String key = e.getKey();
Integer val = e.getValue();
try (Transaction tx = cache.unwrap(Ignite.class).transactions().txStart()) {
assert cache.getAndPutAsync(key, val).get() == null;
assert cache.containsKey(key);
assert val.equals(cache.getAsync(key).get());
assert val.equals(cache.getAndRemoveAsync(key).get());
assert !cache.containsKey(key);
tx.commit();
}
assert !cache.containsKey(key);
}
}, F.t(EVT_CACHE_OBJECT_REMOVED, gridCnt));
}
/**
* @throws Exception If test failed.
*/
@Test
public void testGetPutRemoveAsyncTx2() throws Exception {
runTest(new TestCacheRunnable() {
@Override public void run(IgniteCache<String, Integer> cache) throws IgniteCheckedException {
Map.Entry<String, Integer> e = F.first(pairs(1).entrySet());
assert e != null;
String key = e.getKey();
Integer val = e.getValue();
try (Transaction tx = cache.unwrap(Ignite.class).transactions().txStart()) {
assert cache.getAndPutAsync(key, val).get() == null;
assert cache.containsKey(key);
assert val.equals(cache.getAsync(key).get());
assert val.equals(cache.getAndRemoveAsync(key).get());
assert !cache.containsKey(key);
assert cache.getAndPutAsync(key, val).get() == null;
assert cache.containsKey(key);
tx.commit();
}
assert cache.containsKey(key);
}
}, F.t(EVT_CACHE_OBJECT_PUT, gridCnt));
}
/**
* @throws Exception If test failed.
*/
@Test
public void testPutRemovex() throws Exception {
runTest(new TestCacheRunnable() {
@Override public void run(IgniteCache<String, Integer> cache) throws IgniteCheckedException {
Map.Entry<String, Integer> e = F.first(pairs(1).entrySet());
assert e != null;
String key = e.getKey();
Integer val = e.getValue();
cache.put(key, val);
assert cache.containsKey(key);
assert cache.remove(key);
assert !cache.containsKey(key);
}
}, F.t(EVT_CACHE_OBJECT_PUT, gridCnt), F.t(EVT_CACHE_OBJECT_REMOVED, gridCnt));
}
/**
* @throws Exception If test failed.
*/
@Test
public void testPutRemovexTx1() throws Exception {
runTest(new TestCacheRunnable() {
@Override public void run(IgniteCache<String, Integer> cache) throws IgniteCheckedException {
Map.Entry<String, Integer> e = F.first(pairs(1).entrySet());
assert e != null;
String key = e.getKey();
Integer val = e.getValue();
try (Transaction tx = cache.unwrap(Ignite.class).transactions().txStart()) {
cache.put(key, val);
assert cache.containsKey(key);
assert cache.remove(key);
assert !cache.containsKey(key);
tx.commit();
}
assert !cache.containsKey(key);
}
}, F.t(EVT_CACHE_OBJECT_REMOVED, gridCnt));
}
/**
* @throws Exception If test failed.
*/
@Test
public void testPutRemovexTx2() throws Exception {
runTest(new TestCacheRunnable() {
@Override public void run(IgniteCache<String, Integer> cache) throws IgniteCheckedException {
Map.Entry<String, Integer> e = F.first(pairs(1).entrySet());
assert e != null;
String key = e.getKey();
Integer val = e.getValue();
try (Transaction tx = cache.unwrap(Ignite.class).transactions().txStart()) {
cache.put(key, val);
assert cache.containsKey(key);
assert cache.remove(key);
assert !cache.containsKey(key);
cache.put(key, val);
assert cache.containsKey(key);
tx.commit();
}
assert cache.containsKey(key);
}
}, F.t(EVT_CACHE_OBJECT_PUT, gridCnt));
}
/**
* @throws Exception If test failed.
*/
@Test
public void testPutIfAbsent() throws Exception {
runTest(new TestCacheRunnable() {
@Override public void run(IgniteCache<String, Integer> cache) throws IgniteCheckedException {
Iterator<Map.Entry<String, Integer>> iter = pairs(2).entrySet().iterator();
Map.Entry<String, Integer> e = iter.next();
String key = e.getKey();
Integer val = e.getValue();
assertNull(cache.getAndPutIfAbsent(key, val));
assertEquals(val, cache.getAndPutIfAbsent(key, val));
assert cache.containsKey(key);
e = iter.next();
key = e.getKey();
val = e.getValue();
assert cache.putIfAbsent(key, val);
assert !cache.putIfAbsent(key, val);
assert cache.containsKey(key);
}
}, F.t(EVT_CACHE_OBJECT_PUT, 2 * gridCnt));
}
/**
* @throws Exception If test failed.
*/
@Test
public void testPutIfAbsentTx() throws Exception {
runTest(new TestCacheRunnable() {
@Override public void run(IgniteCache<String, Integer> cache) throws IgniteCheckedException {
Iterator<Map.Entry<String, Integer>> iter = pairs(2).entrySet().iterator();
Map.Entry<String, Integer> e = iter.next();
String key = e.getKey();
Integer val = e.getValue();
try (Transaction tx = grid(0).transactions().txStart();) {
assert cache.getAndPutIfAbsent(key, val) == null;
assertEquals(val, cache.getAndPutIfAbsent(key, val));
assert cache.containsKey(key);
e = iter.next();
key = e.getKey();
val = e.getValue();
assert cache.putIfAbsent(key, val);
assert !cache.putIfAbsent(key, val);
assert cache.containsKey(key);
tx.commit();
}
assert cache.containsKey(key);
}
}, F.t(EVT_CACHE_OBJECT_PUT, 2 * gridCnt));
}
/**
* @throws Exception If test failed.
*/
@Test
public void testPutIfAbsentAsync() throws Exception {
runTest(new TestCacheRunnable() {
@Override public void run(IgniteCache<String, Integer> cache) throws IgniteCheckedException {
Iterator<Map.Entry<String, Integer>> iter = pairs(2).entrySet().iterator();
Map.Entry<String, Integer> e = iter.next();
String key = e.getKey();
Integer val = e.getValue();
assert cache.getAndPutIfAbsentAsync(key, val).get() == null;
assert val.equals(cache.getAndPutIfAbsentAsync(key, val).get());
assert cache.containsKey(key);
e = iter.next();
key = e.getKey();
val = e.getValue();
assert cache.putIfAbsentAsync(key, val).get().booleanValue();
assert !cache.putIfAbsentAsync(key, val).get().booleanValue();
assert cache.containsKey(key);
}
}, F.t(EVT_CACHE_OBJECT_PUT, 2 * gridCnt));
}
/**
* @throws Exception If test failed.
*/
@SuppressWarnings("unchecked")
@Test
public void testPutIfAbsentAsyncTx() throws Exception {
IgniteBiTuple[] evts = new IgniteBiTuple[] {F.t(EVT_CACHE_OBJECT_PUT, 2 * gridCnt)};
runTest(new TestCacheRunnable() {
@Override public void run(IgniteCache<String, Integer> cache) throws IgniteCheckedException {
Iterator<Map.Entry<String, Integer>> iter = pairs(2).entrySet().iterator();
// Optimistic transaction.
try (Transaction tx = cache.unwrap(Ignite.class).transactions().txStart(OPTIMISTIC, REPEATABLE_READ)) {
Map.Entry<String, Integer> e = iter.next();
String key = e.getKey();
Integer val = e.getValue();
assert cache.getAndPutIfAbsentAsync(key, val).get() == null;
assert val.equals(cache.getAndPutIfAbsentAsync(key, val).get());
assert cache.containsKey(key);
e = iter.next();
key = e.getKey();
val = e.getValue();
assert cache.putIfAbsentAsync(key, val).get().booleanValue();
assert !cache.putIfAbsentAsync(key, val).get().booleanValue();
assert cache.containsKey(key);
tx.commit();
assert cache.containsKey(key);
}
}
}, evts);
}
/**
*
*/
private static interface TestCacheRunnable {
/**
* @param cache Cache.
* @throws IgniteCheckedException If any exception occurs.
*/
void run(IgniteCache<String, Integer> cache) throws IgniteCheckedException;
}
/**
* Event listener interface.
*/
protected static interface EventListener extends IgnitePredicate<Event> {
/**
* Start listen.
*/
void listen();
/**
* Stop listen.
*/
void stopListen();
/**
* Gets events count by type.
*
* @param type Type.
*/
int eventCount(int type);
/**
* Reset event counters.
*/
void reset();
/**
* @param evtCnts Event counters.
*/
void waitForEventCount(IgniteBiTuple<Integer, Integer>... evtCnts) throws IgniteCheckedException;
}
/**
* Local event listener.
*/
private static class TestEventListener implements EventListener {
/** Events count map. */
private ConcurrentMap<Integer, AtomicInteger> cntrs = new ConcurrentHashMap<>();
/** Event futures. */
private Collection<EventTypeFuture> futs = new GridConcurrentHashSet<>();
/** */
private volatile boolean listen = true;
/** */
private boolean partitioned;
/**
* @param p Partitioned flag.
*/
private TestEventListener(boolean p) {
partitioned = p;
}
/**
*
*/
@Override public void listen() {
listen = true;
}
/**
*
*/
@Override public void stopListen() {
listen = false;
}
/**
* @param type Event type.
* @return Count.
*/
@Override public int eventCount(int type) {
assert type > 0;
AtomicInteger cntr = cntrs.get(type);
return cntr != null ? cntr.get() : 0;
}
/**
* Reset listener.
*/
@Override public void reset() {
cntrs.clear();
futs.clear();
}
/** {@inheritDoc} */
@Override public boolean apply(Event evt) {
assert evt instanceof CacheEvent;
if (!listen)
return true;
if (TEST_INFO)
X.println("Cache event: " + evt.shortDisplay());
AtomicInteger cntr = F.addIfAbsent(cntrs, evt.type(), F.newAtomicInt());
assert cntr != null;
int cnt = cntr.incrementAndGet();
for (EventTypeFuture f : futs)
f.onEvent(evt.type(), cnt);
return true;
}
/**
* Waits for event count.
*
* @param evtCnts Array of tuples with values: V1 - event type, V2 - expected event count.
* @throws IgniteCheckedException If failed to wait.
*/
@Override public void waitForEventCount(IgniteBiTuple<Integer, Integer>... evtCnts)
throws IgniteCheckedException {
if (F.isEmpty(evtCnts))
return;
// Create future that aggregates all required event types.
GridCompoundIdentityFuture<Object> cf = new GridCompoundIdentityFuture<>();
for (IgniteBiTuple<Integer, Integer> t : evtCnts) {
Integer evtType = t.get1();
Integer expCnt = t.get2();
assert expCnt != null && expCnt > 0;
EventTypeFuture fut = new EventTypeFuture(evtType, expCnt, partitioned);
futs.add(fut);
// We need to account the window.
AtomicInteger cntr = cntrs.get(evtType);
if (!fut.isDone())
fut.onEvent(evtType, cntr != null ? cntr.get() : 0);
cf.add(fut);
}
cf.markInitialized();
try {
cf.get(WAIT_TIMEOUT);
}
catch (IgniteFutureTimeoutCheckedException e) {
throw new RuntimeException("Timed out waiting for events: " + cf, e);
}
}
}
/**
*
*/
private static class EventTypeFuture extends GridFutureAdapter<Object> {
/** */
private int evtType;
/** */
private int expCnt;
/** */
private int cnt;
/** Partitioned flag. */
private boolean partitioned;
/**
* @param evtType Event type.
* @param expCnt Expected count.
* @param partitioned Partitioned flag.
*/
EventTypeFuture(int evtType, int expCnt, boolean partitioned) {
assert expCnt > 0;
this.evtType = evtType;
this.expCnt = expCnt;
this.partitioned = partitioned;
}
/**
* @return Count.
*/
int count() {
return cnt;
}
/**
* @param evtType Event type.
* @param cnt Count.
*/
void onEvent(int evtType, int cnt) {
if (isDone() || this.evtType != evtType)
return;
if (TEST_INFO)
X.println("EventTypeFuture.onEvent() [evtName=" + U.gridEventName(evtType) + ", evtType=" + evtType +
", cnt=" + cnt + ", expCnt=" + expCnt + ']');
this.cnt = cnt;
// For partitioned caches we allow extra event for reads.
if (expCnt < cnt && (!partitioned || evtType != EVT_CACHE_OBJECT_READ || expCnt + 1 < cnt))
onDone(new IgniteCheckedException("Wrong event count [evtName=" + U.gridEventName(evtType) + ", evtType=" +
evtType + ", expCnt=" + expCnt + ", actCnt=" + cnt + ", partitioned=" + partitioned + "]"));
if (expCnt == cnt || (partitioned && expCnt + 1 == cnt))
onDone();
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(EventTypeFuture.class, this, "evtName", U.gridEventName(evtType));
}
}
}