blob: ca7fc0ab79b8d1a71edc90b2367ce2f54bcb32ab [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.io.Serializable;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorResult;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.GridTestUtils.SF;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
/**
* Tests cache in-place modification logic with iterative value increment.
*/
public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTest {
/** Number of nodes to test on. */
private static final int GRID_CNT = 2;
/** Number of increment iterations. */
private final int INCREMENTS = SF.apply(100);
/** Number of test iterations. */
private final int ITERATIONS = SF.applyLB(10, 2);
/** */
private final int KEYS = SF.apply(50);
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setCacheConfiguration(cacheConfiguration());
TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
commSpi.setSharedMemoryPort(-1);
cfg.setCommunicationSpi(commSpi);
return cfg;
}
/**
* @return Cache configuration.
*/
private CacheConfiguration cacheConfiguration() {
CacheConfiguration cache = new CacheConfiguration(DEFAULT_CACHE_NAME);
cache.setCacheMode(PARTITIONED);
cache.setAtomicityMode(atomicityMode());
cache.setWriteSynchronizationMode(FULL_SYNC);
cache.setBackups(1);
cache.setRebalanceMode(SYNC);
return cache;
}
/**
* @return Atomicity mode.
*/
protected CacheAtomicityMode atomicityMode() {
return TRANSACTIONAL;
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
startGridsMultiThreaded(GRID_CNT, true);
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
}
/**
* @throws Exception If failed.
*/
@Test
public void testSingleEntryProcessorNodeJoin() throws Exception {
checkEntryProcessorNodeJoin(false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testAllEntryProcessorNodeJoin() throws Exception {
checkEntryProcessorNodeJoin(true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testEntryProcessorNodeLeave() throws Exception {
startGrid(GRID_CNT);
// TODO: IGNITE-1525 (test fails with one-phase commit).
boolean createCache = atomicityMode() != ATOMIC;
String cacheName = DEFAULT_CACHE_NAME;
if (createCache) {
CacheConfiguration ccfg = cacheConfiguration();
ccfg.setName("cache-2");
ccfg.setBackups(2);
ignite(0).createCache(ccfg);
cacheName = ccfg.getName();
}
try {
int NODES = GRID_CNT + 1;
final int RESTART_IDX = GRID_CNT + 1;
for (int iter = 0; iter < ITERATIONS; iter++) {
log.info("Iteration: " + iter);
startGrid(RESTART_IDX);
awaitPartitionMapExchange();
final CountDownLatch latch = new CountDownLatch(1);
IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
latch.await();
stopGrid(RESTART_IDX);
return null;
}
}, "stop-thread");
int increments = checkIncrement(cacheName, iter % 2 == 1, fut, latch);
assert increments >= INCREMENTS;
fut.get();
for (int i = 0; i < KEYS; i++) {
for (int g = 0; g < NODES; g++) {
Set<String> vals = ignite(g).<String, Set<String>>cache(cacheName).get("set-" + i);
assertNotNull(vals);
assertEquals(increments, vals.size());
}
}
ignite(0).cache(cacheName).removeAll();
}
}
finally {
if (createCache)
ignite(0).destroyCache(cacheName);
}
}
/**
* @param invokeAll If {@code true} tests invokeAll operation.
* @throws Exception If failed.
*/
private void checkEntryProcessorNodeJoin(boolean invokeAll) throws Exception {
final AtomicBoolean stop = new AtomicBoolean();
final AtomicReference<Throwable> error = new AtomicReference<>();
final int started = 6;
IgniteInternalFuture fut = GridTestUtils.runAsync(new Runnable() {
@Override public void run() {
try {
for (int i = 0; i < started && !stop.get(); i++) {
U.sleep(1_000);
if (!stop.get())
startGrid(GRID_CNT + i);
}
}
catch (Exception e) {
error.compareAndSet(null, e);
}
}
}, "starter");
try {
try {
checkIncrement(DEFAULT_CACHE_NAME, invokeAll, null, null);
}
finally {
fut.get(getTestTimeout());
}
for (int i = 0; i < KEYS; i++) {
for (int g = 0; g < GRID_CNT + started; g++) {
Set<String> vals = ignite(g).<String, Set<String>>cache(DEFAULT_CACHE_NAME).get("set-" + i);
assertNotNull(vals);
assertEquals(INCREMENTS, vals.size());
}
}
}
finally {
stop.set(true);
if (!fut.isDone())
fut.cancel();
}
}
/**
* @param cacheName Cache name.
* @param invokeAll If {@code true} tests invokeAll operation.
* @param fut If not null then executes updates while future is not done.
* @param latch Latch to count down when first update is done.
* @return Number of increments.
* @throws Exception If failed.
*/
private int checkIncrement(
String cacheName,
boolean invokeAll,
@Nullable IgniteInternalFuture<?> fut,
@Nullable CountDownLatch latch) throws Exception {
int increments = 0;
for (int k = 0; k < INCREMENTS || (fut != null && !fut.isDone()); k++) {
increments++;
if (invokeAll) {
IgniteCache<String, Set<String>> cache = ignite(0).cache(cacheName);
Map<String, Processor> procs = new LinkedHashMap<>();
for (int i = 0; i < KEYS; i++) {
String key = "set-" + i;
String val = "value-" + k;
procs.put(key, new Processor(val));
}
Map<String, EntryProcessorResult<Integer>> resMap = cache.invokeAll(procs);
for (String key : procs.keySet()) {
EntryProcessorResult<Integer> res = resMap.get(key);
assertNotNull(res);
assertEquals(k + 1, (Object)res.get());
}
}
else {
IgniteCache<String, Set<String>> cache = ignite(0).cache(cacheName);
for (int i = 0; i < KEYS; i++) {
String key = "set-" + i;
String val = "value-" + k;
Integer valsCnt = cache.invoke(key, new Processor(val));
Integer exp = k + 1;
if (!exp.equals(valsCnt))
log.info("Unexpected return value [valsCnt=" + valsCnt +
", exp=" + exp +
", cacheVal=" + cache.get(key) + ']');
assertEquals(exp, valsCnt);
}
}
if (latch != null && k == 0)
latch.countDown();
}
return increments;
}
/**
* @throws Exception If failed.
*/
@Test
public void testReplaceNodeJoin() throws Exception {
final AtomicReference<Throwable> error = new AtomicReference<>();
final int started = 6;
int keys = 100;
final AtomicBoolean stop = new AtomicBoolean(false);
for (int i = 0; i < keys; i++)
ignite(0).cache(DEFAULT_CACHE_NAME).put(i, 0);
IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
@Override public void run() {
try {
for (int i = 0; i < started && !stop.get(); i++) {
U.sleep(1_000);
if (stop.get())
continue;
IgniteEx grid = startGrid(GRID_CNT + i);
info("Test started grid [idx=" + (GRID_CNT + i) + ", nodeId=" + grid.localNode().id() + ']');
}
}
catch (Exception e) {
error.compareAndSet(null, e);
}
finally {
stop.set(true);
}
}
}, 1, "starter");
try {
int updVal = 0;
while (!stop.get()) {
info("Will put: " + (updVal + 1));
for (int i = 0; i < keys; i++)
assertTrue("Failed [key=" + i + ", oldVal=" + updVal + ']',
ignite(0).cache(DEFAULT_CACHE_NAME).replace(i, updVal, updVal + 1));
updVal++;
}
fut.get(getTestTimeout());
for (int i = 0; i < keys; i++) {
for (int g = 0; g < GRID_CNT + started; g++) {
Integer val = ignite(g).<Integer, Integer>cache(DEFAULT_CACHE_NAME).get(i);
GridCacheEntryEx entry = ((IgniteKernal)grid(g)).internalCache(DEFAULT_CACHE_NAME).peekEx(i);
if (updVal != val)
info("Invalid value for grid [g=" + g + ", entry=" + entry + ']');
assertEquals((Integer)updVal, val);
}
}
}
finally {
stop.set(true);
if (!fut.isDone())
fut.cancel();
}
}
/** */
private static class Processor implements EntryProcessor<String, Set<String>, Integer>, Serializable {
/** */
private String val;
/**
* @param val Value.
*/
private Processor(String val) {
this.val = val;
}
/** {@inheritDoc} */
@Override public Integer process(MutableEntry<String, Set<String>> e, Object... args) {
Set<String> vals = e.getValue();
if (vals == null)
vals = new HashSet<>();
vals.add(val);
e.setValue(vals);
return vals.size();
}
}
}