blob: f4e290d86c0844af02f175b1593a6c48e8f17095 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.rest.handlers.cache;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import javax.cache.processor.EntryProcessorException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.ConnectorConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.rest.GridRestCommand;
import org.apache.ignite.internal.processors.rest.handlers.GridRestCommandHandler;
import org.apache.ignite.internal.processors.rest.request.GridRestCacheRequest;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
/**
* Tests command handler directly.
*/
public class GridCacheCommandHandlerSelfTest extends GridCommonAbstractTest {
/**
* Constructor.
*/
public GridCacheCommandHandlerSelfTest() {
super(true);
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration() throws Exception {
// Discovery config.
TcpDiscoverySpi disco = new TcpDiscoverySpi();
disco.setIpFinder(new TcpDiscoveryVmIpFinder(true));
disco.setJoinTimeout(5000);
// Cache config.
CacheConfiguration cacheCfg = defaultCacheConfiguration();
cacheCfg.setCacheMode(CacheMode.REPLICATED);
cacheCfg.setAtomicityMode(atomicityMode());
// Grid config.
IgniteConfiguration cfg = super.getConfiguration();
cfg.setLocalHost("127.0.0.1");
ConnectorConfiguration clnCfg = new ConnectorConfiguration();
clnCfg.setHost("127.0.0.1");
cfg.setConnectorConfiguration(clnCfg);
cfg.setDiscoverySpi(disco);
cfg.setCacheConfiguration(cacheCfg); // Add 'null' cache configuration.
return cfg;
}
/**
*
* @return CacheAtomicityMode for the cache.
*/
protected CacheAtomicityMode atomicityMode() {
return CacheAtomicityMode.TRANSACTIONAL;
}
/**
* Tests the cache failure during the execution of the CACHE_GET command.
*
* @throws Exception If failed.
*/
@Test
public void testCacheGetFailsSyncNotify() throws Exception {
GridRestCommandHandler hnd = new TestableCacheCommandHandler(grid().context(), "getAsync");
GridRestCacheRequest req = new GridRestCacheRequest();
req.cacheName(DEFAULT_CACHE_NAME);
req.command(GridRestCommand.CACHE_GET);
req.key("k1");
try {
hnd.handleAsync(req).get();
fail("Expected exception not thrown.");
}
catch (IgniteCheckedException e) {
info("Got expected exception: " + e);
}
}
/**
* Test cache handler append/prepend commands.
*
* @throws Exception In case of any exception.
*/
@Test
public void testAppendPrepend() throws Exception {
assertEquals("as" + "df", testAppend("as", "df", true));
assertEquals("df" + "as", testAppend("as", "df", false));
List<String> curList = new ArrayList<>(Arrays.asList("a", "b"));
List<String> newList = new ArrayList<>(Arrays.asList("b", "c"));
assertEquals(Arrays.asList("a", "b", "b", "c"), testAppend(curList, newList, true));
assertEquals(Arrays.asList("b", "c", "a", "b"), testAppend(curList, newList, false));
Set<String> curSet = new HashSet<>(Arrays.asList("a", "b"));
Set<String> newSet = new HashSet<>(Arrays.asList("b", "c"));
Set<String> resSet = new HashSet<>(Arrays.asList("a", "b", "c"));
assertEquals(resSet, testAppend(curSet, newSet, true));
assertEquals(resSet, testAppend(curSet, newSet, false));
assertEquals(resSet, testAppend(newSet, curList, true));
assertEquals(resSet, testAppend(newSet, curList, false));
assertEquals(resSet, testAppend(curSet, newList, true));
assertEquals(resSet, testAppend(curSet, newList, false));
Map<String, String> curMap = F.asMap("a", "1", "b", "2", "c", "3");
Map<String, String> newMap = F.asMap("a", "#", "b", null, "c", "%", "d", "4");
assertEquals(F.asMap("a", "#", "c", "%", "d", "4"), testAppend(curMap, newMap, true));
assertEquals(F.asMap("a", "1", "b", "2", "c", "3", "d", "4"), testAppend(curMap, newMap, false));
try {
testAppend("as", Arrays.asList("df"), true);
fail("Expects failed with incompatible types message.");
}
catch (IgniteCheckedException e) {
info("Got expected exception: " + e);
e.printStackTrace();
assertTrue(e.getMessage().startsWith("Incompatible types"));
}
}
/**
* Test cache handler append/prepend commands with specified environment.
*
* @param curVal Current value in cache.
* @param newVal New value to append/prepend.
* @param append Append or prepend flag.
* @param <T> Cache value type.
* @return Resulting value in cache.
* @throws IgniteCheckedException In case of any grid exception.
*/
private <T> T testAppend(T curVal, T newVal, boolean append) throws IgniteCheckedException, EntryProcessorException {
GridRestCommandHandler hnd = new GridCacheCommandHandler(((IgniteKernal)grid()).context());
String key = UUID.randomUUID().toString();
GridRestCacheRequest req = new GridRestCacheRequest();
req.cacheName(DEFAULT_CACHE_NAME);
req.command(append ? GridRestCommand.CACHE_APPEND : GridRestCommand.CACHE_PREPEND);
req.key(key);
req.value(newVal);
assertFalse("Expects failure due to no value in cache.", (Boolean)hnd.handleAsync(req).get().getResponse());
T res;
try {
// Change cache state.
jcache().put(key, curVal);
// Validate behavior for initialized cache (has current value).
assertTrue((Boolean)hnd.handleAsync(req).get().getResponse());
}
finally {
res = (T)jcache().getAndRemove(key);
}
return res;
}
/**
* Tests the execution of the CACHE_CLEAR command.
*
* @throws Exception If failed.
*/
@Test
public void testCacheClear() throws Exception {
GridRestCommandHandler hnd = new GridCacheCommandHandler((grid()).context());
HashMap<Object, Object> caches = new HashMap<>();
caches.put(DEFAULT_CACHE_NAME, null);
GridRestCacheRequest req = new GridRestCacheRequest();
req.command(GridRestCommand.CACHE_CLEAR);
req.values(caches);
try {
// Change cache state.
for (int i = 0; i < 10; i++ ) {
jcache().put(UUID.randomUUID().toString(), UUID.randomUUID().toString());
}
assertTrue(jcache().size() == 10);
assertTrue((Boolean)hnd.handleAsync(req).get().getResponse());
}
finally {
assertTrue(jcache().size() == 0);
}
}
/**
* Test command handler.
*/
private static class TestableCacheCommandHandler extends GridCacheCommandHandler {
/** */
private final String failMtd;
/**
* Constructor.
* @param ctx Context.
* @param failMtd Method to fail.
*/
TestableCacheCommandHandler(final GridKernalContext ctx, final String failMtd) {
super(ctx);
this.failMtd = failMtd;
}
/**
* @param cacheName Name of the cache.
*
* @return Instance of a Cache proxy.
*/
@Override protected IgniteInternalCache<Object, Object> localCache(String cacheName) throws IgniteCheckedException {
final IgniteInternalCache<Object, Object> cache = super.localCache(cacheName);
return (IgniteInternalCache<Object, Object>)Proxy.newProxyInstance(getClass().getClassLoader(),
new Class[] {IgniteInternalCache.class},
new InvocationHandler() {
@Override public Object invoke(Object proxy, Method mtd, Object[] args) throws Throwable {
if (failMtd.equals(mtd.getName())) {
IgniteInternalFuture<Object> fut = new GridFinishedFuture<>(
new IgniteCheckedException("Operation failed"));
return fut;
}
// Rewriting flagOn result to keep intercepting invocations after it.
if ("setSkipStore".equals(mtd.getName()))
return proxy;
if ("forSubjectId".equals(mtd.getName()))
return proxy;
if ("keepBinary".equals(mtd.getName()))
return proxy;
return mtd.invoke(cache, args);
}
});
}
}
}