blob: c65455015deb224cdc1ddf731749a95482b075f7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.baseline;
import javax.cache.CacheException;
import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.BaselineNode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.junit.Ignore;
import org.junit.Test;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR;
/**
* Checks that client affinity assignment cache is calculated correctly regardless of current baseline topology.
*/
public class ClientAffinityAssignmentWithBaselineTest extends GridCommonAbstractTest {
/** Nodes count. */
private static final int DEFAULT_NODES_COUNT = 5;
/** Tx cache name. */
private static final String PARTITIONED_TX_CACHE_NAME = "p-tx-cache";
/** Tx cache name with shifted affinity. */
private static final String PARTITIONED_TX_PRIM_SYNC_CACHE_NAME = "prim-sync";
/** Tx cache name from client static configuration. */
private static final String PARTITIONED_TX_CLIENT_CACHE_NAME = "p-tx-client-cache";
/** Atomic cache name. */
private static final String PARTITIONED_ATOMIC_CACHE_NAME = "p-atomic-cache";
/** Tx cache name. */
private static final String REPLICATED_TX_CACHE_NAME = "r-tx-cache";
/** Atomic cache name. */
private static final String REPLICATED_ATOMIC_CACHE_NAME = "r-atomic-cache";
/** Client grid name. */
private static final String CLIENT_GRID_NAME = "client";
/** Flaky node name */
private static final String FLAKY_NODE_NAME = "flaky";
/** Entries. */
private static final int ENTRIES = 3_000;
/** Flaky node wal path. */
public static final String FLAKY_WAL_PATH = "flakywal";
/** Flaky node wal archive path. */
public static final String FLAKY_WAL_ARCHIVE_PATH = "flakywalarchive";
/** Flaky node storage path. */
public static final String FLAKY_STORAGE_PATH = "flakystorage";
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
if (!igniteInstanceName.startsWith(CLIENT_GRID_NAME)) {
cfg.setDataStorageConfiguration(
new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration()
.setPersistenceEnabled(true)
.setMaxSize(200 * 1024 * 1024)
)
);
}
if (igniteInstanceName.contains(FLAKY_NODE_NAME)) {
File store = U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false);
cfg.getDataStorageConfiguration().setWalPath(new File(store, FLAKY_WAL_PATH).getAbsolutePath());
cfg.getDataStorageConfiguration().setWalArchivePath(new File(store, FLAKY_WAL_ARCHIVE_PATH).getAbsolutePath());
cfg.getDataStorageConfiguration().setStoragePath(new File(store, FLAKY_STORAGE_PATH).getAbsolutePath());
}
cfg.setConsistentId(igniteInstanceName);
List<CacheConfiguration> srvConfigs = new ArrayList<>();
srvConfigs.add(cacheConfig(PARTITIONED_TX_CACHE_NAME));
srvConfigs.add(cacheConfig(PARTITIONED_TX_PRIM_SYNC_CACHE_NAME));
srvConfigs.add(cacheConfig(REPLICATED_ATOMIC_CACHE_NAME));
List<CacheConfiguration> clientConfigs = new ArrayList<>(srvConfigs);
// Skip some configs in client static configuration to check that clients receive correct cache descriptors.
srvConfigs.add(cacheConfig(PARTITIONED_ATOMIC_CACHE_NAME));
srvConfigs.add(cacheConfig(REPLICATED_TX_CACHE_NAME));
// Skip config in server static configuration to check that caches received on client join start correctly.
clientConfigs.add(cacheConfig(PARTITIONED_TX_CLIENT_CACHE_NAME));
if (igniteInstanceName.startsWith(CLIENT_GRID_NAME))
cfg.setCacheConfiguration(clientConfigs.toArray(new CacheConfiguration[clientConfigs.size()]));
else
cfg.setCacheConfiguration(srvConfigs.toArray(new CacheConfiguration[srvConfigs.size()]));
// Enforce different mac adresses to emulate distributed environment by default.
cfg.setUserAttributes(Collections.singletonMap(
IgniteNodeAttributes.ATTR_MACS_OVERRIDE, UUID.randomUUID().toString()));
return cfg;
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
stopAllGrids();
cleanPersistenceDir();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
cleanPersistenceDir();
}
/**
* @param cacheName Cache name.
*/
private CacheConfiguration<Integer, String> cacheConfig(String cacheName) {
CacheConfiguration<Integer, String> cfg = new CacheConfiguration<>();
if (PARTITIONED_ATOMIC_CACHE_NAME.equals(cacheName)) {
cfg.setName(PARTITIONED_ATOMIC_CACHE_NAME);
cfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
cfg.setAffinity(new RendezvousAffinityFunction(false, 32));
cfg.setBackups(2);
}
else if (PARTITIONED_TX_CACHE_NAME.equals(cacheName)) {
cfg.setName(PARTITIONED_TX_CACHE_NAME);
cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
cfg.setAffinity(new RendezvousAffinityFunction(false, 32));
cfg.setBackups(2);
}
else if (PARTITIONED_TX_CLIENT_CACHE_NAME.equals(cacheName)) {
cfg.setName(PARTITIONED_TX_CLIENT_CACHE_NAME);
cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
cfg.setAffinity(new RendezvousAffinityFunction(false, 32));
cfg.setBackups(2);
}
else if (PARTITIONED_TX_PRIM_SYNC_CACHE_NAME.equals(cacheName)) {
cfg.setName(PARTITIONED_TX_PRIM_SYNC_CACHE_NAME);
cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.PRIMARY_SYNC);
cfg.setAffinity(new RendezvousAffinityFunction(false, 41)); // To break collocation.
cfg.setBackups(2);
}
else if (REPLICATED_ATOMIC_CACHE_NAME.equals(cacheName)) {
cfg.setName(REPLICATED_ATOMIC_CACHE_NAME);
cfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
cfg.setAffinity(new RendezvousAffinityFunction(false, 32));
cfg.setCacheMode(CacheMode.REPLICATED);
}
else if (REPLICATED_TX_CACHE_NAME.equals(cacheName)) {
cfg.setName(REPLICATED_TX_CACHE_NAME);
cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
cfg.setAffinity(new RendezvousAffinityFunction(false, 32));
cfg.setCacheMode(CacheMode.REPLICATED);
}
else
throw new IllegalArgumentException("Unexpected cache name");
return cfg;
}
/**
*
*/
@Test
public void testPartitionedAtomicCache() throws Exception {
testChangingBaselineDown(PARTITIONED_ATOMIC_CACHE_NAME, false);
}
/**
*
*/
@Test
public void testPartitionedTxCache() throws Exception {
testChangingBaselineDown(PARTITIONED_TX_CACHE_NAME, false);
}
/**
* Test that activation after client join won't break cache.
*/
@Test
public void testLateActivation() throws Exception {
testChangingBaselineDown(PARTITIONED_TX_CACHE_NAME, true);
}
/**
*
*/
@Test
public void testReplicatedAtomicCache() throws Exception {
testChangingBaselineDown(REPLICATED_ATOMIC_CACHE_NAME, false);
}
/**
*
*/
@Test
public void testReplicatedTxCache() throws Exception {
testChangingBaselineDown(REPLICATED_TX_CACHE_NAME, false);
}
/**
* Tests that changing baseline down under load won't break cache.
*/
private void testChangingBaselineDown(String cacheName, boolean lateActivation) throws Exception {
IgniteEx ig0 = (IgniteEx)startGrids(DEFAULT_NODES_COUNT);
ig0.cluster().baselineAutoAdjustEnabled(false);
IgniteEx client1 = null;
IgniteEx client2 = null;
if (lateActivation) {
client1 = startClientGrid("client1");
client2 = startClientGrid("client2");
}
else
ig0.cluster().active(true);
AtomicBoolean stopLoad = new AtomicBoolean(false);
AtomicReference<Throwable> loadError = new AtomicReference<>(null);
if (lateActivation)
ig0.cluster().active(true);
IgniteCache<Integer, String> cache = ig0.cache(cacheName);
System.out.println("### Starting preloading");
for (int i = 0; i < ENTRIES; i++) {
ThreadLocalRandom r = ThreadLocalRandom.current();
byte[] randBytes = new byte[r.nextInt(10, 100)];
cache.put(r.nextInt(ENTRIES), new String(randBytes));
}
System.out.println("### Preloading is finished");
if (!lateActivation) {
client1 = startClientGrid("client1");
client2 = startClientGrid("client2");
}
ConcurrentMap<Long, Long> threadProgressTracker = new ConcurrentHashMap<>();
startSimpleLoadThread(client1, cacheName, stopLoad, loadError, threadProgressTracker);
startSimpleLoadThread(client1, cacheName, stopLoad, loadError, threadProgressTracker);
startSimpleLoadThread(client1, cacheName, stopLoad, loadError, threadProgressTracker);
startTxLoadThread(client2, cacheName, stopLoad, loadError, threadProgressTracker);
startTxLoadThread(client2, cacheName, stopLoad, loadError, threadProgressTracker);
startTxLoadThread(client2, cacheName, stopLoad, loadError, threadProgressTracker);
awaitProgressInAllLoaders(10_000, loadError, threadProgressTracker);
List<BaselineNode> fullBlt = new ArrayList<>();
for (int i = 0; i < DEFAULT_NODES_COUNT; i++)
fullBlt.add(grid(i).localNode());
stopGrid(DEFAULT_NODES_COUNT - 1, true);
stopGrid(DEFAULT_NODES_COUNT - 2, true);
awaitProgressInAllLoaders(10_000, loadError, threadProgressTracker);
tryChangeBaselineDown(ig0, fullBlt, DEFAULT_NODES_COUNT - 1, loadError, threadProgressTracker);
tryChangeBaselineDown(ig0, fullBlt, DEFAULT_NODES_COUNT - 2, loadError, threadProgressTracker);
stopLoad.set(true);
}
/**
* Tests that rejoin of baseline node with clear LFS under load won't break cache.
*/
@Test
public void testRejoinWithCleanLfs() throws Exception {
IgniteEx ig0 = (IgniteEx)startGrids(DEFAULT_NODES_COUNT - 1);
startGrid("flaky");
ig0.cluster().active(true);
AtomicBoolean stopLoad = new AtomicBoolean(false);
AtomicReference<Throwable> loadError = new AtomicReference<>(null);
IgniteCache<Integer, String> cache1 = ig0.cache(PARTITIONED_ATOMIC_CACHE_NAME);
IgniteCache<Integer, String> cache2 = ig0.cache(PARTITIONED_TX_CACHE_NAME);
IgniteCache<Integer, String> cache3 = ig0.cache(REPLICATED_ATOMIC_CACHE_NAME);
IgniteCache<Integer, String> cache4 = ig0.cache(REPLICATED_TX_CACHE_NAME);
System.out.println("### Starting preloading");
for (int i = 0; i < ENTRIES; i++) {
ThreadLocalRandom r = ThreadLocalRandom.current();
cache1.put(r.nextInt(ENTRIES), new String(new byte[r.nextInt(10, 100)]));
cache2.put(r.nextInt(ENTRIES), new String(new byte[r.nextInt(10, 100)]));
cache3.put(r.nextInt(ENTRIES), new String(new byte[r.nextInt(10, 100)]));
cache4.put(r.nextInt(ENTRIES), new String(new byte[r.nextInt(10, 100)]));
}
System.out.println("### Preloading is finished");
IgniteEx client1 = startClientGrid("client1");
IgniteEx client2 = startClientGrid("client2");
ConcurrentMap<Long, Long> threadProgressTracker = new ConcurrentHashMap<>();
startSimpleLoadThread(client1, PARTITIONED_ATOMIC_CACHE_NAME, stopLoad, loadError, threadProgressTracker);
startSimpleLoadThread(client1, PARTITIONED_TX_CACHE_NAME, stopLoad, loadError, threadProgressTracker);
startSimpleLoadThread(client1, REPLICATED_ATOMIC_CACHE_NAME, stopLoad, loadError, threadProgressTracker);
startTxLoadThread(client2, PARTITIONED_ATOMIC_CACHE_NAME, stopLoad, loadError, threadProgressTracker);
startTxLoadThread(client2, PARTITIONED_TX_CACHE_NAME, stopLoad, loadError, threadProgressTracker);
startTxLoadThread(client2, REPLICATED_TX_CACHE_NAME, stopLoad, loadError, threadProgressTracker);
awaitProgressInAllLoaders(10_000, loadError, threadProgressTracker);
stopGrid("flaky");
awaitProgressInAllLoaders(10_000, loadError, threadProgressTracker);
File store = U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false);
U.delete(new File(store, FLAKY_WAL_PATH));
U.delete(new File(store, FLAKY_WAL_ARCHIVE_PATH));
U.delete(new File(store, FLAKY_STORAGE_PATH));
startGrid("flaky");
System.out.println("### Starting rebalancing after flaky node join");
awaitPartitionMapExchange();
System.out.println("### Rebalancing is finished after flaky node join");
awaitProgressInAllLoaders(10_000, loadError, threadProgressTracker);
stopLoad.set(true);
}
/**
* Test that changing baseline down under cross-cache txs load won't break cache.
*/
@Test
public void testCrossCacheTxs() throws Exception {
IgniteEx ig0 = (IgniteEx)startGrids(DEFAULT_NODES_COUNT);
ig0.cluster().baselineAutoAdjustEnabled(false);
ig0.cluster().active(true);
AtomicBoolean stopLoad = new AtomicBoolean(false);
AtomicReference<Throwable> loadError = new AtomicReference<>(null);
String cacheName1 = PARTITIONED_TX_CACHE_NAME;
String cacheName2 = PARTITIONED_TX_PRIM_SYNC_CACHE_NAME;
IgniteCache<Integer, String> cache1 = ig0.cache(PARTITIONED_TX_CACHE_NAME);
IgniteCache<Integer, String> cache2 = ig0.cache(PARTITIONED_TX_PRIM_SYNC_CACHE_NAME);
System.out.println("### Starting preloading");
for (int i = 0; i < ENTRIES; i++) {
ThreadLocalRandom r = ThreadLocalRandom.current();
byte[] randBytes1 = new byte[r.nextInt(10, 100)];
byte[] randBytes2 = new byte[r.nextInt(10, 100)];
cache1.put(r.nextInt(ENTRIES), new String(randBytes1));
cache2.put(r.nextInt(ENTRIES), new String(randBytes2));
}
System.out.println("### Preloading is finished");
IgniteEx client1 = startClientGrid("client1");
IgniteEx client2 = startClientGrid("client2");
ConcurrentMap<Long, Long> threadProgressTracker = new ConcurrentHashMap<>();
startCrossCacheTxLoadThread(client1, cacheName1, cacheName2, stopLoad, loadError, threadProgressTracker);
startCrossCacheTxLoadThread(client1, cacheName1, cacheName2, stopLoad, loadError, threadProgressTracker);
startCrossCacheTxLoadThread(client1, cacheName2, cacheName1, stopLoad, loadError, threadProgressTracker);
startCrossCacheTxLoadThread(client2, cacheName1, cacheName2, stopLoad, loadError, threadProgressTracker);
startCrossCacheTxLoadThread(client2, cacheName1, cacheName2, stopLoad, loadError, threadProgressTracker);
startCrossCacheTxLoadThread(client2, cacheName2, cacheName1, stopLoad, loadError, threadProgressTracker);
awaitProgressInAllLoaders(10_000, loadError, threadProgressTracker);
List<BaselineNode> fullBlt = new ArrayList<>();
for (int i = 0; i < DEFAULT_NODES_COUNT; i++)
fullBlt.add(grid(i).localNode());
stopGrid(DEFAULT_NODES_COUNT - 1, true);
stopGrid(DEFAULT_NODES_COUNT - 2, true);
awaitProgressInAllLoaders(10_000, loadError, threadProgressTracker);
tryChangeBaselineDown(ig0, fullBlt, DEFAULT_NODES_COUNT - 1, loadError, threadProgressTracker);
tryChangeBaselineDown(ig0, fullBlt, DEFAULT_NODES_COUNT - 2, loadError, threadProgressTracker);
stopLoad.set(true);
}
/**
* Tests that join of non-baseline node while long transactions are running won't break dynamically started cache.
*/
@Test
public void testDynamicCacheLongTransactionNodeStart() throws Exception {
IgniteEx ig0 = (IgniteEx)startGrids(4);
ig0.cluster().active(true);
IgniteEx client = startClientGrid("client");
CacheConfiguration<Integer, String> dynamicCacheCfg = cacheConfig(REPLICATED_TX_CACHE_NAME);
dynamicCacheCfg.setName("dyn");
IgniteCache<Integer, String> dynamicCache = client.getOrCreateCache(dynamicCacheCfg);
for (int i = 0; i < ENTRIES; i++)
dynamicCache.put(i, "abacaba" + i);
AtomicBoolean releaseTx = new AtomicBoolean(false);
CountDownLatch allTxsDoneLatch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
final int i0 = i;
GridTestUtils.runAsync(new Runnable() {
@Override public void run() {
try (Transaction tx = client.transactions().txStart(TransactionConcurrency.PESSIMISTIC,
TransactionIsolation.REPEATABLE_READ)) {
dynamicCache.put(i0, "txtxtxtx" + i0);
while (!releaseTx.get())
LockSupport.parkNanos(1_000_000);
tx.commit();
System.out.println("Tx #" + i0 + " committed");
}
catch (Throwable t) {
System.out.println("Tx #" + i0 + " failed");
t.printStackTrace();
}
finally {
allTxsDoneLatch.countDown();
}
}
});
}
GridTestUtils.runAsync(new Runnable() {
@Override public void run() {
try {
startGrid(4);
}
catch (Exception e) {
e.printStackTrace();
}
}
});
U.sleep(1_000);
releaseTx.set(true);
allTxsDoneLatch.await();
for (int i = 0; i < 10_000; i++)
assertEquals("txtxtxtx" + (i % 10), dynamicCache.get(i % 10));
}
/**
* Tests that if dynamic cache has no affinity nodes at the moment of start,
* it will still work correctly when affinity nodes will appear.
*/
@Ignore("https://issues.apache.org/jira/browse/IGNITE-8652")
@Test
public void testDynamicCacheStartNoAffinityNodes() throws Exception {
IgniteEx ig0 = startGrid(0);
ig0.cluster().active(true);
IgniteEx client = startClientGrid("client");
CacheConfiguration<Integer, String> dynamicCacheCfg = new CacheConfiguration<Integer, String>()
.setName("dyn")
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
.setAffinity(new RendezvousAffinityFunction(false, 32))
.setBackups(2)
.setNodeFilter(new ConsistentIdNodeFilter((Serializable)ig0.localNode().consistentId()));
IgniteCache<Integer, String> dynamicCache = client.getOrCreateCache(dynamicCacheCfg);
for (int i = 1; i < 4; i++)
startGrid(i);
resetBaselineTopology();
for (int i = 0; i < ENTRIES; i++)
dynamicCache.put(i, "abacaba" + i);
AtomicBoolean releaseTx = new AtomicBoolean(false);
CountDownLatch allTxsDoneLatch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
final int i0 = i;
GridTestUtils.runAsync(new Runnable() {
@Override public void run() {
try (Transaction tx = client.transactions().txStart(TransactionConcurrency.PESSIMISTIC,
TransactionIsolation.REPEATABLE_READ)) {
dynamicCache.put(i0, "txtxtxtx" + i0);
while (!releaseTx.get())
LockSupport.parkNanos(1_000_000);
tx.commit();
System.out.println("Tx #" + i0 + " committed");
}
catch (Throwable t) {
System.out.println("Tx #" + i0 + " failed");
t.printStackTrace();
}
finally {
allTxsDoneLatch.countDown();
}
}
});
}
GridTestUtils.runAsync(new Runnable() {
@Override public void run() {
try {
startGrid(4);
}
catch (Exception e) {
e.printStackTrace();
}
}
});
U.sleep(1_000);
releaseTx.set(true);
allTxsDoneLatch.await();
for (int i = 0; i < 10_000; i++)
assertEquals("txtxtxtx" + (i % 10), dynamicCache.get(i % 10));
}
/**
* Tests that join of non-baseline node while long transactions are running won't break cache started on client join.
*/
@Test
public void testClientJoinCacheLongTransactionNodeStart() throws Exception {
IgniteEx ig0 = (IgniteEx)startGrids(4);
ig0.cluster().active(true);
IgniteEx client = startClientGrid("client");
IgniteCache<Integer, String> clientJoinCache = client.cache(PARTITIONED_TX_CLIENT_CACHE_NAME);
for (int i = 0; i < ENTRIES; i++)
clientJoinCache.put(i, "abacaba" + i);
AtomicBoolean releaseTx = new AtomicBoolean(false);
CountDownLatch allTxsDoneLatch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
final int i0 = i;
GridTestUtils.runAsync(new Runnable() {
@Override public void run() {
try (Transaction tx = client.transactions().txStart(TransactionConcurrency.PESSIMISTIC,
TransactionIsolation.REPEATABLE_READ)) {
clientJoinCache.put(i0, "txtxtxtx" + i0);
while (!releaseTx.get())
LockSupport.parkNanos(1_000_000);
tx.commit();
System.out.println("Tx #" + i0 + " committed");
}
catch (Throwable t) {
System.out.println("Tx #" + i0 + " failed");
t.printStackTrace();
}
finally {
allTxsDoneLatch.countDown();
}
}
});
}
GridTestUtils.runAsync(new Runnable() {
@Override public void run() {
try {
startGrid(4);
}
catch (Exception e) {
e.printStackTrace();
}
}
});
U.sleep(1_000);
releaseTx.set(true);
allTxsDoneLatch.await();
for (int i = 0; i < 10_000; i++)
assertEquals("txtxtxtx" + (i % 10), clientJoinCache.get(i % 10));
}
/**
* @param ig0 Ignite.
* @param fullBlt Initial BLT list.
* @param newBaselineSize New baseline size.
* @param threadProgressTracker Thread progress tracker.
*/
private void tryChangeBaselineDown(
IgniteEx ig0,
List<BaselineNode> fullBlt,
int newBaselineSize,
AtomicReference<Throwable> loadError,
ConcurrentMap<Long, Long> threadProgressTracker
) throws Exception {
System.out.println("### Changing BLT: " + (newBaselineSize + 1) + " -> " + newBaselineSize);
ig0.cluster().setBaselineTopology(fullBlt.subList(0, newBaselineSize));
System.out.println("### Starting rebalancing after BLT change: " + (newBaselineSize + 1) + " -> " + newBaselineSize);
awaitPartitionMapExchange();
System.out.println("### Rebalancing is finished after BLT change: " + (newBaselineSize + 1) + " -> " + newBaselineSize);
awaitProgressInAllLoaders(10_000, loadError, threadProgressTracker);
if (loadError.get() != null) {
loadError.get().printStackTrace();
fail("Unexpected error in load thread: " + loadError.get().toString());
}
}
/**
* @param ig Ignite instance.
* @param cacheName Cache name.
* @param stopFlag Stop flag.
* @param loadError Load error reference.
* @param threadProgressTracker Progress tracker.
*/
private void startSimpleLoadThread(
IgniteEx ig,
String cacheName,
AtomicBoolean stopFlag,
AtomicReference<Throwable> loadError,
ConcurrentMap<Long, Long> threadProgressTracker
) {
GridTestUtils.runAsync(new Runnable() {
@Override public void run() {
ThreadLocalRandom r = ThreadLocalRandom.current();
IgniteCache<Integer, String> cache = ig.cache(cacheName);
try {
while (!stopFlag.get()) {
try {
int op = r.nextInt(3);
switch (op) {
case 0:
byte[] randBytes = new byte[r.nextInt(10, 100)];
cache.put(r.nextInt(ENTRIES), new String(randBytes));
break;
case 1:
cache.remove(r.nextInt(ENTRIES));
break;
case 2:
cache.get(r.nextInt(ENTRIES));
break;
}
threadProgressTracker.compute(Thread.currentThread().getId(),
(tId, ops) -> ops == null ? 1 : ops + 1);
}
catch (CacheException e) {
if (e.getCause() instanceof ClusterTopologyException)
((ClusterTopologyException)e.getCause()).retryReadyFuture().get();
}
catch (ClusterTopologyException e) {
e.retryReadyFuture().get();
}
}
}
catch (Throwable t) {
loadError.compareAndSet(null, t);
stopFlag.set(true);
}
}
});
}
/**
* @param ig Ignite instance.
* @param cacheName Cache name.
* @param stopFlag Stop flag.
* @param loadError Load error reference.
* @param threadProgressTracker Progress tracker.
*/
private void startTxLoadThread(
IgniteEx ig,
String cacheName,
AtomicBoolean stopFlag,
AtomicReference<Throwable> loadError,
ConcurrentMap<Long, Long> threadProgressTracker
) {
GridTestUtils.runAsync(new Runnable() {
@Override public void run() {
ThreadLocalRandom r = ThreadLocalRandom.current();
IgniteCache<Integer, String> cache = ig.cache(cacheName).withAllowAtomicOpsInTx();
boolean pessimistic = atomicityMode(cache) == CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT || r.nextBoolean();
boolean rollback = r.nextBoolean();
try {
while (!stopFlag.get()) {
try (Transaction tx = ig.transactions().txStart(
pessimistic ? TransactionConcurrency.PESSIMISTIC : TransactionConcurrency.OPTIMISTIC,
TransactionIsolation.REPEATABLE_READ
)) {
int key1 = -1;
String val1 = null;
while (val1 == null) {
key1 = r.nextInt(ENTRIES);
val1 = cache.get(key1);
}
int key2 = -1;
String val2 = null;
while (val2 == null) {
key2 = r.nextInt(ENTRIES);
val2 = cache.get(key2);
}
cache.put(key1, val2);
cache.put(key2, val1);
if (rollback)
tx.rollback();
else
tx.commit();
threadProgressTracker.compute(Thread.currentThread().getId(),
(tId, ops) -> ops == null ? 1 : ops + 1);
}
catch (CacheException e) {
if (e.getCause() instanceof ClusterTopologyException) {
IgniteFuture retryFut = ((ClusterTopologyException)e.getCause()).retryReadyFuture();
if (retryFut != null)
retryFut.get();
}
}
catch (ClusterTopologyException e) {
e.retryReadyFuture().get();
}
}
}
catch (Throwable t) {
loadError.compareAndSet(null, t);
stopFlag.set(true);
}
}
});
}
/**
* @param ig Ignite instance.
* @param cacheName1 Cache name 1.
* @param cacheName2 Cache name 2.
* @param stopFlag Stop flag.
* @param loadError Load error reference.
* @param threadProgressTracker Progress tracker.
*/
private void startCrossCacheTxLoadThread(
IgniteEx ig,
String cacheName1,
String cacheName2,
AtomicBoolean stopFlag,
AtomicReference<Throwable> loadError,
ConcurrentMap<Long, Long> threadProgressTracker
) {
GridTestUtils.runAsync(new Runnable() {
@Override public void run() {
ThreadLocalRandom r = ThreadLocalRandom.current();
IgniteCache<Integer, String> cache1 = ig.cache(cacheName1);
IgniteCache<Integer, String> cache2 = ig.cache(cacheName2);
boolean pessimistic = atomicityMode(cache1) == CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT ||
atomicityMode(cache2) == CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT || r.nextBoolean();
boolean rollback = r.nextBoolean();
try {
while (!stopFlag.get()) {
try (Transaction tx = ig.transactions().txStart(
pessimistic ? TransactionConcurrency.PESSIMISTIC : TransactionConcurrency.OPTIMISTIC,
TransactionIsolation.REPEATABLE_READ
)) {
int key1 = -1;
String val1 = null;
while (val1 == null) {
key1 = r.nextInt(ENTRIES);
val1 = cache1.get(key1);
}
int key2 = -1;
String val2 = null;
while (val2 == null) {
key2 = r.nextInt(ENTRIES);
val2 = cache2.get(key2);
}
cache1.put(key1, val2);
cache2.put(key2, val1);
if (rollback)
tx.rollback();
else
tx.commit();
threadProgressTracker.compute(Thread.currentThread().getId(),
(tId, ops) -> ops == null ? 1 : ops + 1);
}
catch (CacheException e) {
if (e.getCause() instanceof ClusterTopologyException)
((ClusterTopologyException)e.getCause()).retryReadyFuture().get();
}
catch (ClusterTopologyException e) {
e.retryReadyFuture().get();
}
}
}
catch (Throwable t) {
loadError.compareAndSet(null, t);
stopFlag.set(true);
}
}
});
}
/**
* @param waitMs Wait milliseconds.
* @param loadError Load error.
* @param threadProgressTracker Thread progress tracker.
*/
private void awaitProgressInAllLoaders(
long waitMs,
AtomicReference<Throwable> loadError,
ConcurrentMap<Long, Long> threadProgressTracker
) throws Exception {
Map<Long, Long> view1 = new HashMap<>(threadProgressTracker);
long startTs = U.currentTimeMillis();
while (U.currentTimeMillis() < startTs + waitMs) {
Map<Long, Long> view2 = new HashMap<>(threadProgressTracker);
Throwable t;
if ((t = loadError.get()) != null)
fail("Unexpected error in load thread: " + X.getFullStackTrace(t));
boolean frozenThreadExists = false;
for (Map.Entry<Long, Long> entry : view1.entrySet()) {
if (entry.getValue().equals(view2.get(entry.getKey())))
frozenThreadExists = true;
}
if (!frozenThreadExists)
return;
U.sleep(100);
}
fail("No progress in load thread");
}
/**
* Accepts all nodes except one with specified consistent ID.
*/
private static class ConsistentIdNodeFilter implements IgnitePredicate<ClusterNode> {
/** Consistent ID. */
private final Serializable consId0;
/**
* @param consId0 Consistent ID.
*/
public ConsistentIdNodeFilter(Serializable consId0) {
this.consId0 = consId0;
}
/** {@inheritDoc} */
@Override public boolean apply(ClusterNode node) {
return !node.consistentId().equals(consId0);
}
}
}