blob: 44d7ed8aaa9335ed7af5d9b57337c240402cd315 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.standbycluster.reconnect;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import com.google.common.collect.Sets;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.DiscoverySpiListener;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Assert;
import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
/**
*
*/
public abstract class IgniteAbstractStandByClientReconnectTest extends GridCommonAbstractTest {
/** */
private static final TcpDiscoveryVmIpFinder vmIpFinder = new TcpDiscoveryVmIpFinder(true);
/** */
private static final TcpDiscoveryVmIpFinder clientIpFinder = new TcpDiscoveryVmIpFinder(true);
/** */
protected static final String node1 = "node1";
/** */
protected static final String node2 = "node2";
/** */
protected static final String nodeClient = "nodeClient";
/** */
protected static final String ccfg1staticName = "cache1static";
/** */
protected static final String ccfg2staticName = "cache2static";
/** */
protected static final String ccfg3staticName = "cache3static";
/** */
protected static final String ccfg1staticWithFilterName = "ccfg1staticWithFilter";
/** */
protected static final String ccfg2staticWithFilterName = "ccfg2staticWithFilter";
/** */
protected static final String ccfg3staticWithFilterName = "ccfg3staticWithFilter";
/** */
protected static final String ccfgDynamicName = "ccfgDynamic";
/** */
protected static final String ccfgDynamicWithFilterName = "ccfgDynamicWithFilter";
/** */
protected final CacheConfiguration ccfg1static = new CacheConfiguration(ccfg1staticName);
/** */
protected final CacheConfiguration ccfg2static = new CacheConfiguration(ccfg2staticName);
/** */
protected final CacheConfiguration ccfg3static = new CacheConfiguration(ccfg3staticName);
/** */
protected final CacheConfiguration ccfg1staticWithFilter =
new CacheConfiguration(ccfg1staticWithFilterName).setNodeFilter(new FilterNode(node2));
/** */
protected final CacheConfiguration ccfg2staticWithFilter =
new CacheConfiguration(ccfg2staticWithFilterName).setNodeFilter(new FilterNode(nodeClient));
/** */
protected final CacheConfiguration ccfg3staticWithFilter =
new CacheConfiguration(ccfg3staticWithFilterName).setNodeFilter(new FilterNode(node1));
/** */
protected final CacheConfiguration<Object, Object> ccfgDynamic = new CacheConfiguration<>(ccfgDynamicName);
/** */
protected final CacheConfiguration<Object, Object> ccfgDynamicWithFilter =
new CacheConfiguration<>(ccfgDynamicWithFilterName).setNodeFilter(new FilterNode(node2));
/** */
protected final Set<String> staticCacheNames = Sets.newHashSet(
ccfg1staticName, ccfg2staticName, ccfg3staticName,
ccfg1staticWithFilterName, ccfg2staticWithFilterName, ccfg3staticWithFilterName
);
/** */
protected final Set<String> allCacheNames = Sets.newHashSet(
ccfg1staticName, ccfg2staticName, ccfg3staticName,
ccfg1staticWithFilterName, ccfg2staticWithFilterName, ccfg3staticWithFilterName,
ccfgDynamicName, ccfgDynamicWithFilterName
);
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(name);
cfg.setAutoActivationEnabled(false);
if (!nodeClient.equals(name))
cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(vmIpFinder));
else {
clientIpFinder.setAddresses(Collections.singletonList("127.0.0.1:47501"));
cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(clientIpFinder));
}
cfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setMaxSize(100L * 1024 * 1024)
.setPersistenceEnabled(true)));
cfg.setConsistentId(name);
return cfg;
}
/**
* @param disconnectedLatch Disconnect latch. Will be fired when client disconnect event is received.
* @param reconnectedLatch Reconnect latch. Will be fired when cilent reconnect event is receoved.
*/
protected void addDisconnectListener(
final CountDownLatch disconnectedLatch,
final CountDownLatch reconnectedLatch
) {
grid(nodeClient).events().localListen(new IgnitePredicate<Event>() {
@Override public boolean apply(Event event) {
switch (event.type()) {
case EventType.EVT_CLIENT_NODE_DISCONNECTED:
info("Client disconnected");
disconnectedLatch.countDown();
break;
case EventType.EVT_CLIENT_NODE_RECONNECTED:
info("Client reconnected");
reconnectedLatch.countDown();
}
return true;
}
}, EventType.EVT_CLIENT_NODE_DISCONNECTED, EventType.EVT_CLIENT_NODE_RECONNECTED);
}
/**
* @param ig Ignite.
* @param cacheNames Cache names.
*/
protected void checkDescriptors(IgniteEx ig, Set<String> cacheNames) {
Collection<DynamicCacheDescriptor> descs = ig.context().cache().cacheDescriptors().values();
assertEquals("Node name: " + ig.name(), cacheNames.size() + 1, descs.size());
int systemCnt = 0;
for (DynamicCacheDescriptor desc : descs)
if (!CU.isSystemCache(desc.cacheName()))
assertTrue(desc.cacheName(), cacheNames.contains(desc.cacheName()));
else
systemCnt++;
assertEquals(1, systemCnt);
}
/**
* @param activateLatch Activate latch. Will be fired when cluster is activated.
* @throws Exception If failed.
*/
protected void startNodes(CountDownLatch activateLatch) throws Exception {
IgniteConfiguration cfg1 = getConfiguration(node1)
.setCacheConfiguration(ccfg1static, ccfg1staticWithFilter);
IgniteConfiguration cfg2 = getConfiguration(node2)
.setCacheConfiguration(ccfg2static, ccfg2staticWithFilter);
IgniteConfiguration cfg3 = getConfiguration(nodeClient)
.setCacheConfiguration(ccfg3static, ccfg3staticWithFilter);
if (activateLatch != null)
cfg3.setDiscoverySpi(
new AwaitTcpDiscoverySpi(activateLatch)
.setIpFinder(clientIpFinder)
);
startGrid(cfg1);
startGrid(cfg2);
startClientGrid(cfg3);
}
/**
*
*/
protected void checkStaticCaches() {
IgniteEx ig1 = grid(node1);
IgniteEx ig2 = grid(node2);
IgniteEx client = grid(nodeClient);
Assert.assertNotNull(ig1.cache(ccfg1staticName));
Assert.assertNotNull(ig1.cache(ccfg2staticName));
Assert.assertNotNull(ig1.cache(ccfg3staticName));
Assert.assertNotNull(ig1.cache(ccfg1staticWithFilterName));
Assert.assertNotNull(ig1.cache(ccfg2staticWithFilterName));
Assert.assertNotNull(ig2.cache(ccfg1staticName));
Assert.assertNotNull(ig2.cache(ccfg2staticName));
Assert.assertNotNull(ig2.cache(ccfg3staticName));
Assert.assertNotNull(ig2.cache(ccfg3staticWithFilterName));
Assert.assertNotNull(ig2.cache(ccfg2staticWithFilterName));
Assert.assertNotNull(client.cache(ccfg1staticName));
Assert.assertNotNull(client.cache(ccfg2staticName));
Assert.assertNotNull(client.cache(ccfg3staticName));
Assert.assertNotNull(client.cache(ccfg3staticWithFilterName));
Assert.assertNotNull(client.cache(ccfg1staticWithFilterName));
}
/**
*
*/
protected void checkAllCaches() {
IgniteEx ig1 = grid(node1);
IgniteEx ig2 = grid(node2);
IgniteEx client = grid(nodeClient);
checkStaticCaches();
Assert.assertNotNull(ig1.cache(ccfgDynamicName));
Assert.assertNotNull(ig1.cache(ccfgDynamicWithFilterName));
Assert.assertNotNull(ig2.cache(ccfgDynamicName));
Assert.assertNotNull(client.cache(ccfgDynamicName));
Assert.assertNotNull(client.cache(ccfgDynamicWithFilterName));
}
/**
* @param checkClientCaches Check presence of client caches, false to skip.
*/
protected void checkOnlySystemCaches(boolean checkClientCaches) {
IgniteEx ig1 = grid(node1);
IgniteEx ig2 = grid(node2);
IgniteEx client = grid(nodeClient);
Assert.assertNull(ig1.cache(ccfg1staticName));
Assert.assertNull(ig1.cache(ccfg2staticName));
Assert.assertNull(ig1.cache(ccfg1staticWithFilterName));
Assert.assertNull(ig1.cache(ccfg2staticWithFilterName));
Assert.assertNull(ig2.cache(ccfg1staticName));
Assert.assertNull(ig2.cache(ccfg2staticName));
Assert.assertNull(ig2.cache(ccfg2staticWithFilterName));
Assert.assertNull(client.cache(ccfg1staticName));
Assert.assertNull(client.cache(ccfg2staticName));
Assert.assertNull(client.cache(ccfg1staticWithFilterName));
if (checkClientCaches) {
Assert.assertNull(ig1.cache(ccfg3staticName));
Assert.assertNull(ig2.cache(ccfg3staticName));
Assert.assertNull(ig2.cache(ccfg3staticWithFilterName));
Assert.assertNull(client.cache(ccfg3staticName));
Assert.assertNull(client.cache(ccfg3staticWithFilterName));
}
Set cachesToCheck = checkClientCaches ? Collections.emptySet()
: Sets.newHashSet(ccfg3staticName, ccfg3staticWithFilterName);
checkDescriptors(ig1, cachesToCheck);
checkDescriptors(ig2, cachesToCheck);
checkDescriptors(client, cachesToCheck);
}
/**
*
*/
private static class FilterNode implements IgnitePredicate<ClusterNode> {
/** */
private final String consistentId;
/**
* @param id Consistent ID.
*/
private FilterNode(String id) {
consistentId = id;
}
/** {@inheritDoc} */
@Override public boolean apply(ClusterNode node) {
return !consistentId.equals(node.consistentId());
}
}
/**
*
*/
private static class AwaitTcpDiscoverySpi extends TcpDiscoverySpi {
/** */
private final CountDownLatch latch;
/**
* @param latch Latch.
*/
private AwaitTcpDiscoverySpi(CountDownLatch latch) {
this.latch = latch;
}
/**
* @param lsnr Listener.
*/
@Override public void setListener(@Nullable DiscoverySpiListener lsnr) {
super.setListener(new AwaitDiscoverySpiListener(latch, lsnr));
}
}
/**
*
*/
private static class AwaitDiscoverySpiListener implements DiscoverySpiListener {
/** */
private final CountDownLatch latch;
/** */
private final DiscoverySpiListener delegate;
/**
* @param latch Latch.
* @param delegate Delegate.
*/
private AwaitDiscoverySpiListener(
CountDownLatch latch,
DiscoverySpiListener delegate
) {
this.latch = latch;
this.delegate = delegate;
}
/** {@inheritDoc} */
@Override public void onLocalNodeInitialized(ClusterNode locNode) {
delegate.onLocalNodeInitialized(locNode);
}
/** {@inheritDoc} */
@Override public IgniteFuture<?> onDiscovery(
int type,
long topVer,
ClusterNode node,
Collection<ClusterNode> topSnapshot,
@Nullable Map<Long, Collection<ClusterNode>> topHist,
@Nullable DiscoverySpiCustomMessage data
) {
IgniteFuture<?> fut = delegate.onDiscovery(type, topVer, node, topSnapshot, topHist, data);
if (type == EVT_CLIENT_NODE_DISCONNECTED) {
try {
System.out.println("Await cluster change state");
latch.await();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return fut;
}
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
stopAllGrids();
cleanPersistenceDir();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.beforeTest();
stopAllGrids();
cleanPersistenceDir();
}
}