| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.rest.handlers.cache; |
| |
| import java.lang.reflect.InvocationHandler; |
| import java.lang.reflect.Method; |
| import java.lang.reflect.Proxy; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import javax.cache.processor.EntryProcessorException; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.cache.CacheAtomicityMode; |
| import org.apache.ignite.cache.CacheMode; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.configuration.ConnectorConfiguration; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.internal.GridKernalContext; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.IgniteKernal; |
| import org.apache.ignite.internal.processors.cache.IgniteInternalCache; |
| import org.apache.ignite.internal.processors.rest.GridRestCommand; |
| import org.apache.ignite.internal.processors.rest.handlers.GridRestCommandHandler; |
| import org.apache.ignite.internal.processors.rest.request.GridRestCacheRequest; |
| import org.apache.ignite.internal.util.future.GridFinishedFuture; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; |
| import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; |
| import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; |
| import org.junit.Test; |
| |
| /** |
| * Tests command handler directly. |
| */ |
| public class GridCacheCommandHandlerSelfTest extends GridCommonAbstractTest { |
| /** |
| * Constructor. |
| */ |
| public GridCacheCommandHandlerSelfTest() { |
| super(true); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteConfiguration getConfiguration() throws Exception { |
| // Discovery config. |
| TcpDiscoverySpi disco = new TcpDiscoverySpi(); |
| |
| disco.setIpFinder(new TcpDiscoveryVmIpFinder(true)); |
| disco.setJoinTimeout(5000); |
| |
| // Cache config. |
| CacheConfiguration cacheCfg = defaultCacheConfiguration(); |
| |
| cacheCfg.setCacheMode(CacheMode.REPLICATED); |
| |
| cacheCfg.setAtomicityMode(atomicityMode()); |
| |
| // Grid config. |
| IgniteConfiguration cfg = super.getConfiguration(); |
| |
| cfg.setLocalHost("127.0.0.1"); |
| |
| ConnectorConfiguration clnCfg = new ConnectorConfiguration(); |
| clnCfg.setHost("127.0.0.1"); |
| |
| cfg.setConnectorConfiguration(clnCfg); |
| cfg.setDiscoverySpi(disco); |
| cfg.setCacheConfiguration(cacheCfg); // Add 'null' cache configuration. |
| |
| return cfg; |
| } |
| |
| /** |
| * |
| * @return CacheAtomicityMode for the cache. |
| */ |
| protected CacheAtomicityMode atomicityMode() { |
| return CacheAtomicityMode.TRANSACTIONAL; |
| } |
| |
| /** |
| * Tests the cache failure during the execution of the CACHE_GET command. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testCacheGetFailsSyncNotify() throws Exception { |
| GridRestCommandHandler hnd = new TestableCacheCommandHandler(grid().context(), "getAsync"); |
| |
| GridRestCacheRequest req = new GridRestCacheRequest(); |
| |
| req.cacheName(DEFAULT_CACHE_NAME); |
| |
| req.command(GridRestCommand.CACHE_GET); |
| |
| req.key("k1"); |
| |
| try { |
| hnd.handleAsync(req).get(); |
| |
| fail("Expected exception not thrown."); |
| } |
| catch (IgniteCheckedException e) { |
| info("Got expected exception: " + e); |
| } |
| } |
| |
| /** |
| * Test cache handler append/prepend commands. |
| * |
| * @throws Exception In case of any exception. |
| */ |
| @Test |
| public void testAppendPrepend() throws Exception { |
| assertEquals("as" + "df", testAppend("as", "df", true)); |
| assertEquals("df" + "as", testAppend("as", "df", false)); |
| |
| List<String> curList = new ArrayList<>(Arrays.asList("a", "b")); |
| List<String> newList = new ArrayList<>(Arrays.asList("b", "c")); |
| |
| assertEquals(Arrays.asList("a", "b", "b", "c"), testAppend(curList, newList, true)); |
| assertEquals(Arrays.asList("b", "c", "a", "b"), testAppend(curList, newList, false)); |
| |
| Set<String> curSet = new HashSet<>(Arrays.asList("a", "b")); |
| Set<String> newSet = new HashSet<>(Arrays.asList("b", "c")); |
| Set<String> resSet = new HashSet<>(Arrays.asList("a", "b", "c")); |
| |
| assertEquals(resSet, testAppend(curSet, newSet, true)); |
| assertEquals(resSet, testAppend(curSet, newSet, false)); |
| assertEquals(resSet, testAppend(newSet, curList, true)); |
| assertEquals(resSet, testAppend(newSet, curList, false)); |
| assertEquals(resSet, testAppend(curSet, newList, true)); |
| assertEquals(resSet, testAppend(curSet, newList, false)); |
| |
| Map<String, String> curMap = F.asMap("a", "1", "b", "2", "c", "3"); |
| Map<String, String> newMap = F.asMap("a", "#", "b", null, "c", "%", "d", "4"); |
| |
| assertEquals(F.asMap("a", "#", "c", "%", "d", "4"), testAppend(curMap, newMap, true)); |
| assertEquals(F.asMap("a", "1", "b", "2", "c", "3", "d", "4"), testAppend(curMap, newMap, false)); |
| |
| try { |
| testAppend("as", Arrays.asList("df"), true); |
| |
| fail("Expects failed with incompatible types message."); |
| } |
| catch (IgniteCheckedException e) { |
| info("Got expected exception: " + e); |
| |
| e.printStackTrace(); |
| |
| assertTrue(e.getMessage().startsWith("Incompatible types")); |
| } |
| } |
| |
| /** |
| * Test cache handler append/prepend commands with specified environment. |
| * |
| * @param curVal Current value in cache. |
| * @param newVal New value to append/prepend. |
| * @param append Append or prepend flag. |
| * @param <T> Cache value type. |
| * @return Resulting value in cache. |
| * @throws IgniteCheckedException In case of any grid exception. |
| */ |
| private <T> T testAppend(T curVal, T newVal, boolean append) throws IgniteCheckedException, EntryProcessorException { |
| GridRestCommandHandler hnd = new GridCacheCommandHandler(((IgniteKernal)grid()).context()); |
| |
| String key = UUID.randomUUID().toString(); |
| |
| GridRestCacheRequest req = new GridRestCacheRequest(); |
| |
| req.cacheName(DEFAULT_CACHE_NAME); |
| |
| req.command(append ? GridRestCommand.CACHE_APPEND : GridRestCommand.CACHE_PREPEND); |
| |
| req.key(key); |
| req.value(newVal); |
| |
| assertFalse("Expects failure due to no value in cache.", (Boolean)hnd.handleAsync(req).get().getResponse()); |
| |
| T res; |
| |
| try { |
| // Change cache state. |
| jcache().put(key, curVal); |
| |
| // Validate behavior for initialized cache (has current value). |
| assertTrue((Boolean)hnd.handleAsync(req).get().getResponse()); |
| } |
| finally { |
| res = (T)jcache().getAndRemove(key); |
| } |
| |
| return res; |
| } |
| |
| /** |
| * Tests the execution of the CACHE_CLEAR command. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testCacheClear() throws Exception { |
| GridRestCommandHandler hnd = new GridCacheCommandHandler((grid()).context()); |
| |
| HashMap<Object, Object> caches = new HashMap<>(); |
| caches.put(DEFAULT_CACHE_NAME, null); |
| |
| GridRestCacheRequest req = new GridRestCacheRequest(); |
| req.command(GridRestCommand.CACHE_CLEAR); |
| req.values(caches); |
| |
| try { |
| // Change cache state. |
| for (int i = 0; i < 10; i++ ) { |
| jcache().put(UUID.randomUUID().toString(), UUID.randomUUID().toString()); |
| } |
| |
| assertTrue(jcache().size() == 10); |
| |
| assertTrue((Boolean)hnd.handleAsync(req).get().getResponse()); |
| } |
| finally { |
| assertTrue(jcache().size() == 0); |
| } |
| } |
| |
| /** |
| * Test command handler. |
| */ |
| private static class TestableCacheCommandHandler extends GridCacheCommandHandler { |
| /** */ |
| private final String failMtd; |
| |
| /** |
| * Constructor. |
| * @param ctx Context. |
| * @param failMtd Method to fail. |
| */ |
| TestableCacheCommandHandler(final GridKernalContext ctx, final String failMtd) { |
| super(ctx); |
| |
| this.failMtd = failMtd; |
| } |
| |
| /** |
| * @param cacheName Name of the cache. |
| * |
| * @return Instance of a Cache proxy. |
| */ |
| @Override protected IgniteInternalCache<Object, Object> localCache(String cacheName) throws IgniteCheckedException { |
| final IgniteInternalCache<Object, Object> cache = super.localCache(cacheName); |
| |
| return (IgniteInternalCache<Object, Object>)Proxy.newProxyInstance(getClass().getClassLoader(), |
| new Class[] {IgniteInternalCache.class}, |
| new InvocationHandler() { |
| @Override public Object invoke(Object proxy, Method mtd, Object[] args) throws Throwable { |
| if (failMtd.equals(mtd.getName())) { |
| IgniteInternalFuture<Object> fut = new GridFinishedFuture<>( |
| new IgniteCheckedException("Operation failed")); |
| |
| return fut; |
| } |
| |
| // Rewriting flagOn result to keep intercepting invocations after it. |
| if ("setSkipStore".equals(mtd.getName())) |
| return proxy; |
| |
| if ("forSubjectId".equals(mtd.getName())) |
| return proxy; |
| |
| if ("keepBinary".equals(mtd.getName())) |
| return proxy; |
| |
| return mtd.invoke(cache, args); |
| } |
| }); |
| } |
| } |
| } |