blob: 1f5ec6c846fbb83239fa483473f29be366b75da5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.AffinityFunctionContext;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
/**
*
*/
public abstract class GridExchangeFreeCellularSwitchAbstractTest extends GridCommonAbstractTest {
/** Partitioned cache name. */
protected static final String PART_CACHE_NAME = "partitioned";
/** Replicated cache name. */
protected static final String REPL_CACHE_NAME = "replicated";
/** */
protected final ListeningTestLogger listeningLog = new ListeningTestLogger(log);
/**
* {@inheritDoc}
*/
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
cfg.setCommunicationSpi(commSpi);
cfg.setCacheConfiguration(cacheConfiguration());
cfg.setClusterStateOnStart(ClusterState.INACTIVE);
DataStorageConfiguration dsCfg = new DataStorageConfiguration();
DataRegionConfiguration drCfg = new DataRegionConfiguration();
drCfg.setPersistenceEnabled(true);
dsCfg.setDefaultDataRegionConfiguration(drCfg);
cfg.setDataStorageConfiguration(dsCfg);
cfg.setGridLogger(listeningLog);
return cfg;
}
/**
*
*/
private CacheConfiguration<?, ?>[] cacheConfiguration() {
CacheConfiguration<?, ?> partitionedCcfg = new CacheConfiguration<>();
partitionedCcfg.setName(PART_CACHE_NAME);
partitionedCcfg.setWriteSynchronizationMode(FULL_SYNC);
partitionedCcfg.setBackups(2);
partitionedCcfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
partitionedCcfg.setAffinity(new Map6PartitionsTo6NodesTo2CellsAffinityFunction());
CacheConfiguration<?, ?> replicatedCcfg = new CacheConfiguration<>();
replicatedCcfg.setName(REPL_CACHE_NAME);
replicatedCcfg.setWriteSynchronizationMode(FULL_SYNC);
replicatedCcfg.setCacheMode(CacheMode.REPLICATED);
replicatedCcfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
return new CacheConfiguration[] {partitionedCcfg, replicatedCcfg};
}
/**
* {@inheritDoc}
*/
@Override protected void beforeTest() throws Exception {
super.beforeTest();
cleanPersistenceDir();
}
/**
* {@inheritDoc}
*/
@Override protected void afterTest() throws Exception {
stopAllGrids();
}
/**
*
*/
protected void awaitForSwitchOnNodeLeft(Ignite failed) throws IgniteInterruptedCheckedException {
assertTrue(GridTestUtils.waitForCondition(
() -> {
for (Ignite ignite : G.allGrids()) {
if (ignite == failed)
continue;
GridDhtPartitionsExchangeFuture fut =
((IgniteEx)ignite).context().cache().context().exchange().lastTopologyFuture();
if (!fut.exchangeId().isLeft())
return false;
}
return true;
}, 5000));
}
/**
*
*/
protected void blockRecoveryMessages() {
for (Ignite ignite : G.allGrids()) {
TestRecordingCommunicationSpi spi =
(TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
@Override public boolean apply(ClusterNode node, Message msg) {
return msg.getClass().equals(GridCacheTxRecoveryRequest.class);
}
});
}
}
/**
*
*/
protected void checkTransactionsCount(
Ignite orig,
int origCnt,
Ignite primary,
int primaryCnt,
List<Ignite> backupNodes,
int backupCnt,
List<Ignite> nearNodes,
int nearCnt,
Set<GridCacheVersion> vers) {
Function<Ignite, Collection<GridCacheVersion>> txs = ignite -> {
Collection<IgniteInternalTx> active = ((IgniteEx)ignite).context().cache().context().tm().activeTransactions();
// Transactions originally started at backups will be presented as single element.
return active.stream()
.map(IgniteInternalTx::nearXidVersion)
.filter(ver -> vers == null || vers.contains(ver))
.collect(Collectors.toSet());
};
if (orig != null)
assertEquals(origCnt, txs.apply(orig).size());
if (primary != null && primary != orig)
assertEquals(primaryCnt, txs.apply(primary).size());
for (Ignite backup : backupNodes)
if (backup != orig)
assertEquals(backupCnt, txs.apply(backup).size());
for (Ignite near : nearNodes)
if (near != orig)
assertEquals(nearCnt, txs.apply(near).size());
}
/**
*
*/
protected static class Map6PartitionsTo6NodesTo2CellsAffinityFunction extends RendezvousAffinityFunction {
/**
* Default constructor.
*/
public Map6PartitionsTo6NodesTo2CellsAffinityFunction() {
super(false, 6);
}
/**
* {@inheritDoc}
*/
@Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) {
List<List<ClusterNode>> res = new ArrayList<>(6);
int backups = affCtx.backups();
assert backups == 2;
if (affCtx.currentTopologySnapshot().size() == 6) {
List<ClusterNode> p0 = new ArrayList<>();
List<ClusterNode> p1 = new ArrayList<>();
List<ClusterNode> p2 = new ArrayList<>();
List<ClusterNode> p3 = new ArrayList<>();
List<ClusterNode> p4 = new ArrayList<>();
List<ClusterNode> p5 = new ArrayList<>();
// Cell 1.
p0.add(affCtx.currentTopologySnapshot().get(0));
p0.add(affCtx.currentTopologySnapshot().get(1));
p0.add(affCtx.currentTopologySnapshot().get(2));
p1.add(affCtx.currentTopologySnapshot().get(2));
p1.add(affCtx.currentTopologySnapshot().get(0));
p1.add(affCtx.currentTopologySnapshot().get(1));
p2.add(affCtx.currentTopologySnapshot().get(1));
p2.add(affCtx.currentTopologySnapshot().get(2));
p2.add(affCtx.currentTopologySnapshot().get(0));
// Cell 2.
p3.add(affCtx.currentTopologySnapshot().get(3));
p3.add(affCtx.currentTopologySnapshot().get(4));
p3.add(affCtx.currentTopologySnapshot().get(5));
p4.add(affCtx.currentTopologySnapshot().get(5));
p4.add(affCtx.currentTopologySnapshot().get(3));
p4.add(affCtx.currentTopologySnapshot().get(4));
p5.add(affCtx.currentTopologySnapshot().get(4));
p5.add(affCtx.currentTopologySnapshot().get(5));
p5.add(affCtx.currentTopologySnapshot().get(3));
res.add(p0);
res.add(p1);
res.add(p2);
res.add(p3);
res.add(p4);
res.add(p5);
}
return res;
}
}
/**
* Specifies node starts the transaction (originating node).
*/
protected enum TransactionCoordinatorNode {
/** Primary. */
PRIMARY,
/** Backup. */
BACKUP,
/** Near. */
NEAR,
/** Client. */
CLIENT
}
}