blob: f254497096632f8871aeb8cf8ac590bd83c53284 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.util.Collections;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.affinity.AffinityKeyMapped;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.processors.cache.binary.MetadataRequestMessage;
import org.apache.ignite.internal.processors.cache.binary.MetadataResponseMessage;
import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessage;
import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
/**
* Tests, that binary metadata is registered correctly during the start without extra request to grid.
*/
public class CacheRegisterMetadataLocallyTest extends GridCommonAbstractTest {
/** */
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
/** */
private static final String STATIC_CACHE_NAME = "staticCache";
/** */
private static final String DYNAMIC_CACHE_NAME = "dynamicCache";
/** Holder of sent custom messages. */
private final ConcurrentLinkedQueue<Object> customMessages = new ConcurrentLinkedQueue<>();
/** Holder of sent communication messages. */
private final ConcurrentLinkedQueue<Object> communicationMessages = new ConcurrentLinkedQueue<>();
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setDiscoverySpi(new TcpDiscoverySpi() {
@Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException {
if (msg instanceof CustomMessageWrapper) {
DiscoveryCustomMessage realMsg = ((CustomMessageWrapper)msg).delegate();
if (realMsg instanceof MetadataUpdateProposedMessage || realMsg instanceof MetadataUpdateAcceptedMessage)
customMessages.add(realMsg);
}
super.sendCustomEvent(msg);
}
});
cfg.setCommunicationSpi(new TcpCommunicationSpi() {
@Override public void sendMessage(ClusterNode node, Message msg,
IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
if (msg instanceof GridIoMessage)
communicationMessages.add(((GridIoMessage)msg).message());
super.sendMessage(node, msg, ackC);
}
@Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
if (msg instanceof GridIoMessage)
communicationMessages.add(((GridIoMessage)msg).message());
super.sendMessage(node, msg);
}
});
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
cfg.setCacheConfiguration(cacheConfiguration(STATIC_CACHE_NAME, StaticKey.class, StaticValue.class));
return cfg;
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
cleanPersistenceDir();
customMessages.clear();
communicationMessages.clear();
}
/**
* @throws Exception If failed.
*/
@Test
public void testAffinityKeyRegisteredStaticCache() throws Exception {
Ignite ignite = startGrid(0);
assertEquals("affKey", getAffinityKey(ignite, StaticKey.class));
assertEquals("affKey", getAffinityKey(ignite, StaticValue.class));
}
/**
* @throws Exception If failed.
*/
@Test
public void testAffinityKeyRegisteredDynamicCache() throws Exception {
Ignite ignite = startGrid(0);
ignite.createCache(cacheConfiguration(DYNAMIC_CACHE_NAME, DynamicKey.class, DynamicValue.class));
assertEquals("affKey", getAffinityKey(ignite, DynamicKey.class));
assertEquals("affKey", getAffinityKey(ignite, DynamicValue.class));
}
/**
* @throws Exception If failed.
*/
@Test
public void testClientFindsValueByAffinityKeyStaticCacheWithoutExtraRequest() throws Exception {
Ignite srv = startGrid(0);
IgniteCache<StaticKey, StaticValue> cache = srv.cache(STATIC_CACHE_NAME);
testClientAndServerFindsValueByAffinityKey(cache, new StaticKey(1), new StaticValue(2));
assertCustomMessages(2); //MetadataUpdateProposedMessage for update schema.
assertCommunicationMessages();
}
/**
* @throws Exception If failed.
*/
@Test
public void testClientFindsValueByAffinityKeyDynamicCacheWithoutExtraRequest() throws Exception {
Ignite srv = startGrid(0);
IgniteCache<DynamicKey, DynamicValue> cache =
srv.createCache(cacheConfiguration(DYNAMIC_CACHE_NAME, DynamicKey.class, DynamicValue.class));
testClientAndServerFindsValueByAffinityKey(cache, new DynamicKey(3), new DynamicValue(4));
//Expected only MetadataUpdateProposedMessage for update schema.
assertCustomMessages(2);
assertCommunicationMessages();
}
/**
* @param ignite Ignite instance.
* @param keyCls Key class.
* @return Name of affinity key field of the given class.
*/
private <K> String getAffinityKey(Ignite ignite, Class<K> keyCls) {
BinaryType binType = ignite.binary().type(keyCls);
return binType.affinityKeyFieldName();
}
/**
* @param cache Cache instance.
* @param key Test key.
* @param val Test value.
* @throws Exception If failed.
*/
private <K, V> void testClientAndServerFindsValueByAffinityKey(IgniteCache<K, V> cache, K key, V val) throws Exception {
cache.put(key, val);
assertTrue(cache.containsKey(key));
Ignite client = startClientGrid("client");
IgniteCache<K, V> clientCache = client.cache(cache.getName());
assertTrue(clientCache.containsKey(key));
Ignite server = startGrid(1);
IgniteCache<K, V> serverCache = server.cache(cache.getName());
assertTrue(serverCache.containsKey(key));
}
/**
* @param name Cache name.
* @param keyCls Key {@link Class}.
* @param valCls Value {@link Class}.
* @param <K> Key type.
* @param <V> Value type.
* @return Cache configuration
*/
private static <K, V> CacheConfiguration<K, V> cacheConfiguration(String name, Class<K> keyCls, Class<V> valCls) {
CacheConfiguration<K, V> cfg = new CacheConfiguration<>(name);
cfg.setQueryEntities(Collections.singleton(new QueryEntity(keyCls, valCls)));
return cfg;
}
/**
* Expecting that "proposed binary metadata"( {@link org.apache.ignite.internal.processors.marshaller.MappingProposedMessage},
* {@link org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage}) will be skipped because
* it should be register locally during the start.
*
* @param expMsgCnt Count of expected messages.
*/
private void assertCustomMessages(int expMsgCnt) {
assertEquals(customMessages.toString(), expMsgCnt, customMessages.size());
customMessages.forEach(cm -> assertTrue(cm.toString(), cm instanceof DynamicCacheChangeBatch || cm instanceof MetadataUpdateProposedMessage));
}
/**
* Expecting that extra request to binary metadata( {@link MetadataRequestMessage}, {@link MetadataResponseMessage})
* will be skipped because it should be register locally during the start.
*/
private void assertCommunicationMessages() {
communicationMessages.forEach(cm ->
assertFalse(cm.toString(), cm instanceof MetadataRequestMessage || cm instanceof MetadataResponseMessage)
);
}
/** */
private static class StaticKey {
/** */
@AffinityKeyMapped
private int affKey;
/**
* @param affKey Affinity key.
*/
StaticKey(int affKey) {
this.affKey = affKey;
}
}
/** */
private static class StaticValue {
/** It doesn't make sense on value class. It it just for checking that value class also register correctly. */
@AffinityKeyMapped
private int affKey;
/**
* @param affKey Affinity key.
*/
StaticValue(int affKey) {
}
}
/** */
private static class DynamicKey {
/** */
@AffinityKeyMapped
private int affKey;
/**
* @param affKey Affinity key.
*/
DynamicKey(int affKey) {
this.affKey = affKey;
}
}
/** */
private static class DynamicValue {
/** It doesn't make sense on value class. It it just for checking that value class also register correctly. */
@AffinityKeyMapped
private int affKey;
/**
* @param affKey Affinity key.
*/
DynamicValue(int affKey) {
this.affKey = affKey;
}
}
}