blob: 8c1bb48092ce18c01364f82c19d60fb735ec5c9f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.Callable;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.lang.IgniteInClosureX;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
/**
* Test dynamic WAL mode change.
*/
public abstract class WalModeChangeCommonAbstractSelfTest extends GridCommonAbstractTest {
/** Node filter. */
private static final IgnitePredicate<ClusterNode> FILTER = new CacheNodeFilter();
/** Cache name. */
protected static final String CACHE_NAME = "cache";
/** Cache name 2. */
protected static final String CACHE_NAME_2 = "cache_2";
/** Cache name 2. */
protected static final String CACHE_NAME_3 = "cache_3";
/** Volatile data region. */
protected static final String REGION_VOLATILE = "volatile";
/** Server 1. */
protected static final String SRV_1 = "srv_1";
/** Server 2. */
protected static final String SRV_2 = "srv_2";
/** Server 3. */
protected static final String SRV_3 = "srv_3";
/** Client. */
protected static final String CLI = "cli";
/** Client 2. */
protected static final String CLI_2 = "cli_2";
/** Node attribute for filtering. */
protected static final String FILTER_ATTR = "FILTER";
/** Whether this is JDBC test. */
protected final boolean jdbc;
/**
* Constructor.
*
* @param jdbc Whether this is JDBC test.
*/
protected WalModeChangeCommonAbstractSelfTest(boolean jdbc) {
this.jdbc = jdbc;
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
for (Ignite node0 : Ignition.allGrids()) {
Collection<String> cacheNames = node0.cacheNames();
for (String cacheName : cacheNames)
destroyCache(node0, cacheName);
}
awaitPartitionMapExchange();
assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
for (Ignite node0 : Ignition.allGrids()) {
if (!node0.cacheNames().isEmpty())
return false;
}
return true;
}
}, 2000));
}
/**
* Create cache.
*
* @param node Node.
* @param ccfg Cache configuration.
*/
@SuppressWarnings("unchecked")
protected void createCache(Ignite node, CacheConfiguration ccfg) throws IgniteCheckedException {
node.getOrCreateCache(ccfg);
alignCacheTopologyVersion(node);
}
/**
* Destroy cache.
*
* @param node Node.
* @param cacheName Cache name.
*/
protected void destroyCache(Ignite node, String cacheName) throws IgniteCheckedException {
node.destroyCache(cacheName);
alignCacheTopologyVersion(node);
}
/**
* Waits for the topology version to be not less than one registered on source node.
*
* @param src Source node.
* @throws IgniteCheckedException If failed to wait on affinity ready future.
*/
protected void alignCacheTopologyVersion(Ignite src) throws IgniteCheckedException {
AffinityTopologyVersion topVer = ((IgniteEx)src).context().cache().context().exchange().readyAffinityVersion();
info("Will wait for topology version on all nodes: " + topVer);
for (Ignite ignite : Ignition.allGrids()) {
IgniteInternalFuture<?> ready = ((IgniteEx)ignite).context().cache().context().exchange()
.affinityReadyFuture(topVer);
if (ready != null)
ready.get();
}
}
/**
* Enable WAL.
*
* @param node Node.
* @param cacheName Cache name.
* @return Result.
*/
protected boolean walEnable(Ignite node, String cacheName) {
return node.cluster().enableWal(cacheName);
}
/**
* Disable WAL.
*
* @param node Node.
* @param cacheName Cache name.
* @return Result.
*/
protected boolean walDisable(Ignite node, String cacheName) {
return node.cluster().disableWal(cacheName);
}
/**
* Enable WAL.
*
* @param node Node.
* @param cacheName Cache name.
* @param expRes Expected result.
*/
protected void assertWalEnable(Ignite node, String cacheName, boolean expRes) {
boolean res = walEnable(node, cacheName);
assertEquals(expRes, res);
}
/**
* Disable WAL.
*
* @param node Node.
* @param cacheName Cache name.
* @param expRes Expected result.
*/
protected void assertWalDisable(Ignite node, String cacheName, boolean expRes) {
boolean res = walDisable(node, cacheName);
if (!jdbc)
assertEquals(expRes, res);
}
/**
* Assert WAL state on all nodes.
*
* @param cacheName Cache name.
* @param expState Expected state.
* @throws IgniteCheckedException If failed.
*/
protected void assertForAllNodes(String cacheName, boolean expState) throws IgniteCheckedException {
for (final Ignite node : Ignition.allGrids()) {
info(">>> Checking WAL state on node: " + node.name());
assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
return node.cluster().isWalEnabled(cacheName) == expState;
}
}, 1000L);
}
}
/**
* Ensure exception is thrown.
*
* @param cmd Command.
* @param errCls Expected error class.
* @param errMsg Expected error message.
*/
protected void assertThrows(Callable<Void> cmd, Class errCls, String errMsg) {
try {
cmd.call();
fail("Exception is not thrown");
}
catch (Exception e) {
if (jdbc) {
e = (Exception)e.getCause();
assert e instanceof SQLException : e.getClass().getName();
}
else
assert F.eq(errCls, e.getClass());
if (errMsg != null) {
assert e.getMessage() != null;
assert e.getMessage().startsWith(errMsg) : e.getMessage();
}
}
}
/**
* Execute certain logic for all nodes.
*
* @param task Task.
* @throws Exception If failed.
*/
protected void forAllNodes(IgniteInClosureX<Ignite> task) throws Exception {
for (Ignite node : Ignition.allGrids()) {
try {
info("");
info(">>> Executing test on node: " + node.name());
task.applyx(node);
}
finally {
for (Ignite node0 : Ignition.allGrids()) {
Collection<String> cacheNames = node0.cacheNames();
for (String cacheName : cacheNames)
destroyCache(node0, cacheName);
}
}
}
}
/**
* Create node configuration.
*
* @param name Name.
* @param cli Client flag.
* @param filter Whether node should be filtered out.
* @return Node configuration.
*/
protected IgniteConfiguration config(String name, boolean cli, boolean filter) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(name);
cfg.setIgniteInstanceName(name);
cfg.setClientMode(cli);
cfg.setLocalHost("127.0.0.1");
DataRegionConfiguration regionCfg = new DataRegionConfiguration()
.setPersistenceEnabled(true)
.setMaxSize(DataStorageConfiguration.DFLT_DATA_REGION_INITIAL_SIZE);
DataRegionConfiguration volatileRegionCfg = new DataRegionConfiguration().setName(REGION_VOLATILE)
.setPersistenceEnabled(false);
DataStorageConfiguration storageCfg = new DataStorageConfiguration();
storageCfg.setDefaultDataRegionConfiguration(regionCfg);
storageCfg.setDataRegionConfigurations(volatileRegionCfg);
cfg.setDataStorageConfiguration(storageCfg);
if (filter)
cfg.setUserAttributes(Collections.singletonMap(FILTER_ATTR, true));
return cfg;
}
/**
* Create common cache configuration (default name, transactional).
*
* @param mode Mode.
* @return Cache configuration.
*/
protected CacheConfiguration cacheConfig(CacheMode mode) {
return cacheConfig(CACHE_NAME, mode, TRANSACTIONAL);
}
/**
* Create cache configuration.
*
* @param name Name.
* @param mode Mode.
* @param atomicityMode Atomicity mode.
* @return Cache configuration.
*/
protected CacheConfiguration cacheConfig(String name, CacheMode mode, CacheAtomicityMode atomicityMode) {
CacheConfiguration ccfg = new CacheConfiguration();
ccfg.setName(name);
ccfg.setCacheMode(mode);
ccfg.setAtomicityMode(atomicityMode);
ccfg.setNodeFilter(FILTER);
return ccfg;
}
/**
* Cache node filter.
*/
protected static class CacheNodeFilter implements IgnitePredicate<ClusterNode> {
/** {@inheritDoc} */
@Override public boolean apply(ClusterNode node) {
Object filterAttr = node.attribute(FILTER_ATTR);
return filterAttr == null;
}
}
}