blob: 5a00345b84f074778402070a198039a2bb1be9e2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.index;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.query.QueryTypeDescriptorImpl;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.testframework.GridTestUtils;
import java.util.Collections;
import java.util.Map;
import static org.apache.ignite.internal.IgniteClientReconnectAbstractTest.TestTcpDiscoverySpi;
import static org.apache.ignite.internal.IgniteClientReconnectAbstractTest.reconnectClientNode;
/**
* Tests for schema exchange between nodes.
*/
public class SchemaExchangeSelfTest extends AbstractSchemaSelfTest {
/** Node on which filter should be applied (if any). */
private static String filterNodeName;
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
filterNodeName = null;
super.afterTest();
}
/**
* Test propagation of empty query schema for static cache.
*
* @throws Exception If failed.
*/
public void testEmptyStatic() throws Exception {
checkEmpty(false);
}
/**
* Test propagation of empty query schema for dynamic cache.
*
* @throws Exception If failed.
*/
public void testEmptyDynamic() throws Exception {
checkEmpty(true);
}
/**
* Check empty metadata propagation.
*
* @param dynamic Dynamic start flag.
* @throws Exception If failed.
*/
private void checkEmpty(boolean dynamic) throws Exception {
IgniteEx node1;
if (dynamic) {
node1 = startNoCache(1);
node1.getOrCreateCache(cacheConfiguration());
}
else
node1 = start(1);
assertTypes(node1);
IgniteEx node2 = start(2, KeyClass.class, ValueClass.class);
assertTypes(node1);
assertTypes(node2);
IgniteEx node3 = start(3, KeyClass.class, ValueClass.class, KeyClass2.class, ValueClass2.class);
assertTypes(node1);
assertTypes(node2);
assertTypes(node3);
}
/**
* Test propagation of non-empty query schema for static cache.
*
* @throws Exception If failed.
*/
public void testNonEmptyStatic() throws Exception {
checkNonEmpty(false);
}
/**
* Test propagation of non-empty query schema for dynamic cache.
*
* @throws Exception If failed.
*/
public void testNonEmptyDynamic() throws Exception {
checkNonEmpty(true);
}
/**
* Check тщт-empty metadata propagation.
*
* @param dynamic Dynamic start flag.
* @throws Exception If failed.
*/
private void checkNonEmpty(boolean dynamic) throws Exception {
IgniteEx node1;
if (dynamic) {
node1 = startNoCache(1);
node1.getOrCreateCache(cacheConfiguration(KeyClass.class, ValueClass.class));
}
else
node1 = start(1, KeyClass.class, ValueClass.class);
assertTypes(node1, ValueClass.class);
IgniteEx node2 = start(2);
assertTypes(node1, ValueClass.class);
assertTypes(node2, ValueClass.class);
IgniteEx node3 = start(3, KeyClass.class, ValueClass.class, KeyClass2.class, ValueClass2.class);
assertTypes(node1, ValueClass.class);
assertTypes(node2, ValueClass.class);
assertTypes(node3, ValueClass.class);
}
/**
* Make sure that new metadata can be propagated after destroy.
*
* @throws Exception If failed.
*/
public void testDynamicRestarts() throws Exception {
IgniteEx node1 = start(1, KeyClass.class, ValueClass.class);
IgniteEx node2 = startNoCache(2);
IgniteEx node3 = startClientNoCache(3);
IgniteEx node4 = startClientNoCache(4);
assertTypes(node1, ValueClass.class);
assertTypes(node2, ValueClass.class);
assertTypes(node3);
node3.cache(CACHE_NAME);
assertTypes(node3, ValueClass.class);
// Check restarts from the first node.
node1.destroyCache(CACHE_NAME);
node1.getOrCreateCache(cacheConfiguration());
assertTypes(node1);
assertTypes(node2);
assertTypes(node3);
node1.destroyCache(CACHE_NAME);
node1.getOrCreateCache(cacheConfiguration(KeyClass.class, ValueClass.class,
KeyClass2.class, ValueClass2.class));
assertTypes(node1, ValueClass.class, ValueClass2.class);
assertTypes(node2, ValueClass.class, ValueClass2.class);
assertTypes(node3);
node3.cache(CACHE_NAME);
assertTypes(node3, ValueClass.class, ValueClass2.class);
// Check restarts from the second node.
node2.destroyCache(CACHE_NAME);
node2.getOrCreateCache(cacheConfiguration());
assertTypes(node1);
assertTypes(node2);
assertTypes(node3);
node2.destroyCache(CACHE_NAME);
node2.getOrCreateCache(cacheConfiguration(KeyClass.class, ValueClass.class,
KeyClass2.class, ValueClass2.class));
assertTypes(node1, ValueClass.class, ValueClass2.class);
assertTypes(node2, ValueClass.class, ValueClass2.class);
assertTypes(node3);
node3.cache(CACHE_NAME);
assertTypes(node3, ValueClass.class, ValueClass2.class);
assertTypes(node4);
node4.cache(CACHE_NAME);
assertTypes(node4, ValueClass.class, ValueClass2.class);
// Make sure that joining node observes correct state.
assertTypes(start(5), ValueClass.class, ValueClass2.class);
assertTypes(startNoCache(6), ValueClass.class, ValueClass2.class);
assertTypes(startClient(7), ValueClass.class, ValueClass2.class);
IgniteEx node8 = startClientNoCache(8);
assertTypes(node8);
node8.cache(CACHE_NAME);
assertTypes(node8, ValueClass.class, ValueClass2.class);
}
/**
* Test client join for static cache.
*
* @throws Exception If failed.
*/
public void testClientJoinStatic() throws Exception {
checkClientJoin(false);
}
/**
* Test client join for dynamic cache.
*
* @throws Exception If failed.
*/
public void testClientJoinDynamic() throws Exception {
checkClientJoin(true);
}
/**
* Check client join.
*
* @throws Exception If failed.
*/
private void checkClientJoin(boolean dynamic) throws Exception {
IgniteEx node1;
if (dynamic) {
node1 = startNoCache(1);
node1.getOrCreateCache(cacheConfiguration(KeyClass.class, ValueClass.class));
}
else
node1 = start(1, KeyClass.class, ValueClass.class);
IgniteEx node2 = startClient(2);
assertTypes(node1, ValueClass.class);
assertTypes(node2, ValueClass.class);
IgniteEx node3 = startClient(3, KeyClass.class, ValueClass.class,
KeyClass2.class, ValueClass2.class);
assertTypes(node1, ValueClass.class);
assertTypes(node2, ValueClass.class);
assertTypes(node3, ValueClass.class);
IgniteEx node4 = startClientNoCache(4);
assertTypes(node1, ValueClass.class);
assertTypes(node2, ValueClass.class);
assertTypes(node3, ValueClass.class);
assertTypes(node4);
node4.cache(CACHE_NAME);
assertTypes(node4, ValueClass.class);
}
/**
* Test client cache start (static).
*
* @throws Exception If failed.
*/
public void testClientCacheStartStatic() throws Exception {
checkClientCacheStart(false);
}
/**
* Test client cache start (dynamic).
*
* @throws Exception If failed.
*/
public void testClientCacheStartDynamic() throws Exception {
checkClientCacheStart(true);
}
/**
* Check client cache start.
*
* @throws Exception If failed.
*/
private void checkClientCacheStart(boolean dynamic) throws Exception {
IgniteEx node1 = startNoCache(1);
IgniteEx node2;
if (dynamic) {
node2 = startClientNoCache(2);
node2.getOrCreateCache(cacheConfiguration(KeyClass.class, ValueClass.class));
}
else
node2 = startClient(2, KeyClass.class, ValueClass.class);
assertTypes(node1, ValueClass.class);
assertTypes(node2, ValueClass.class);
IgniteEx node3 = start(3);
IgniteEx node4 = start(4, KeyClass.class, ValueClass.class, KeyClass2.class, ValueClass2.class);
IgniteEx node5 = startNoCache(5);
IgniteEx node6 = startClient(6);
IgniteEx node7 = startClient(7, KeyClass.class, ValueClass.class, KeyClass2.class, ValueClass2.class);
IgniteEx node8 = startClientNoCache(8);
assertTypes(node1, ValueClass.class);
assertTypes(node2, ValueClass.class);
assertTypes(node3, ValueClass.class);
assertTypes(node4, ValueClass.class);
assertTypes(node5, ValueClass.class);
assertTypes(node6, ValueClass.class);
assertTypes(node7, ValueClass.class);
assertTypes(node8);
node8.cache(CACHE_NAME);
assertTypes(node8, ValueClass.class);
node2.destroyCache(CACHE_NAME);
node2.getOrCreateCache(
cacheConfiguration(KeyClass.class, ValueClass.class, KeyClass2.class, ValueClass2.class));
assertTypes(node1, ValueClass.class, ValueClass2.class);
assertTypes(node2, ValueClass.class, ValueClass2.class);
assertTypes(node3, ValueClass.class, ValueClass2.class);
assertTypes(node4, ValueClass.class, ValueClass2.class);
assertTypes(node5, ValueClass.class, ValueClass2.class);
assertTypes(node6);
assertTypes(node7);
assertTypes(node8);
node6.cache(CACHE_NAME);
node7.cache(CACHE_NAME);
node8.cache(CACHE_NAME);
assertTypes(node6, ValueClass.class, ValueClass2.class);
assertTypes(node7, ValueClass.class, ValueClass2.class);
assertTypes(node8, ValueClass.class, ValueClass2.class);
}
/**
* Test behavior when node filter is set.
*
* @throws Exception If failed.
*/
public void testNodeFilter() throws Exception {
filterNodeName = getTestIgniteInstanceName(1);
IgniteEx node1 = start(1, KeyClass.class, ValueClass.class);
assertTypes(node1, ValueClass.class);
IgniteEx node2 = start(2, KeyClass.class, ValueClass.class);
assertTypes(node1, ValueClass.class);
assertTypes(node2, ValueClass.class);
IgniteEx node3 = startNoCache(3);
assertTypes(node1, ValueClass.class);
assertTypes(node2, ValueClass.class);
assertTypes(node3);
node3.cache(CACHE_NAME);
assertTypes(node3, ValueClass.class);
}
/**
* Test client reconnect after server restart accompanied by schema change.
*
* @throws Exception If failed.
*/
public void testServerRestartWithNewTypes() throws Exception {
IgniteEx node1 = start(1, KeyClass.class, ValueClass.class);
assertTypes(node1, ValueClass.class);
IgniteEx node2 = startClientNoCache(2);
assertTypes(node2);
node2.cache(CACHE_NAME);
assertTypes(node2, ValueClass.class);
stopGrid(1);
assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
return grid(2).context().clientDisconnected();
}
}, 10_000L));
IgniteFuture reconnFut = null;
try {
node2.cache(CACHE_NAME);
fail();
}
catch (IgniteClientDisconnectedException e) {
reconnFut = e.reconnectFuture();
}
node1 = start(1, KeyClass.class, ValueClass.class, KeyClass2.class, ValueClass2.class);
assertTypes(node1, ValueClass.class, ValueClass2.class);
reconnFut.get();
assertTypes(node2);
node2.cache(CACHE_NAME);
assertTypes(node2, ValueClass.class, ValueClass2.class);
}
/**
* Test client reconnect.
*
* @throws Exception If failed.
*/
@SuppressWarnings("unchecked")
public void testClientReconnect() throws Exception {
final IgniteEx node1 = start(1, KeyClass.class, ValueClass.class);
assertTypes(node1, ValueClass.class);
final IgniteEx node2 = startClientNoCache(2);
assertTypes(node2);
node2.cache(CACHE_NAME);
assertTypes(node2, ValueClass.class);
reconnectClientNode(log, node2, node1, new Runnable() {
@Override public void run() {
assertTrue(node2.context().clientDisconnected());
final QueryIndex idx = index(IDX_NAME_1, field(FIELD_NAME_1));
try {
queryProcessor(node1).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx, false).get();
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
});
assertIndex(CACHE_NAME, TBL_NAME, IDX_NAME_1, field(FIELD_NAME_1));
}
/**
* Ensure that only provided types exists for the given cache.
*
* @param node Node.
* @param clss Classes.
*/
private static void assertTypes(IgniteEx node, Class... clss) {
Map<String, QueryTypeDescriptorImpl> types = types(node, CACHE_NAME);
if (F.isEmpty(clss))
assertTrue(types.isEmpty());
else {
assertEquals(clss.length, types.size());
for (Class cls : clss) {
String tblName = tableName(cls);
assertTrue(types.containsKey(tblName));
}
}
}
/**
* Start node with the given cache configuration.
*
* @param clss Key-value classes.
* @return Node.
*/
private IgniteEx start(int idx, Class... clss) throws Exception {
return start(idx, false, clss);
}
/**
* Start client node with the given cache configuration.
*
* @param clss Key-value classes.
* @return Node.
*/
private IgniteEx startClient(int idx, Class... clss) throws Exception {
return start(idx, true, clss);
}
/**
* Start node with the given cache configuration.
*
* @param idx Index.
* @param client Client flag.
* @param clss Key-value classes.
* @return Node.
*/
private IgniteEx start(int idx, boolean client, Class... clss) throws Exception {
String name = getTestIgniteInstanceName(idx);
IgniteConfiguration cfg = getConfiguration(name);
cfg.setClientMode(client);
cfg.setLocalHost("127.0.0.1");
cfg.setCacheConfiguration(cacheConfiguration(clss));
cfg.setDiscoverySpi(new TestTcpDiscoverySpi());
if (filterNodeName != null && F.eq(name, filterNodeName))
cfg.setUserAttributes(Collections.singletonMap("AFF_NODE", true));
return (IgniteEx)Ignition.start(cfg);
}
/**
* Start node without cache.
*
* @param idx Index.
* @return Node.
* @throws Exception If failed.
*/
private IgniteEx startNoCache(int idx) throws Exception {
return startNoCache(idx, false);
}
/**
* Start client node without cache.
*
* @param idx Index.
* @return Node.
* @throws Exception If failed.
*/
private IgniteEx startClientNoCache(int idx) throws Exception {
return startNoCache(idx, true);
}
/**
* Start node without cache.
*
* @param idx Index.
* @param client Client mode flag.
* @return Node.
* @throws Exception If failed.
*/
private IgniteEx startNoCache(int idx, boolean client) throws Exception {
String name = getTestIgniteInstanceName(idx);
IgniteConfiguration cfg = getConfiguration(name);
cfg.setClientMode(client);
cfg.setLocalHost("127.0.0.1");
cfg.setDiscoverySpi(new TestTcpDiscoverySpi());
return (IgniteEx)Ignition.start(cfg);
}
/**
* Get cache configuration.
*
* @param clss QUery classes.
* @return Configuration.
*/
@SuppressWarnings("unchecked")
private static CacheConfiguration cacheConfiguration(Class... clss) {
CacheConfiguration ccfg = new CacheConfiguration().setName(CACHE_NAME).setIndexedTypes(clss);
if (filterNodeName != null) {
ccfg.setNodeFilter(new IgnitePredicate<ClusterNode>() {
@Override public boolean apply(ClusterNode node) {
return node.attribute("AFF_NODE") != null;
}
});
}
return ccfg;
}
// TODO: Start/stop many nodes with static configs and dynamic start/stop.
/**
* Key class 2.
*/
@SuppressWarnings("unused")
private static class KeyClass2 {
@QuerySqlField
private String keyField2;
}
/**
* Value class 2.
*/
@SuppressWarnings("unused")
private static class ValueClass2 {
@QuerySqlField
private String valField2;
}
}