blob: b42c4132ca17b3f26d8db95f37259b55d8d189ed [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.service;
import java.lang.reflect.Proxy;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.Ignite;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceConfiguration;
import org.apache.ignite.services.ServiceContext;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
/**
* Service proxy test.
*/
public class GridServiceProcessorProxySelfTest extends GridServiceProcessorAbstractSelfTest {
/** {@inheritDoc} */
@Override protected int nodeCount() {
return 4;
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
grid(0).services().cancelAll();
}
/**
* @throws Exception If failed.
*/
@Test
public void testNodeSingletonProxy() throws Exception {
String name = "testNodeSingletonProxy";
Ignite ignite = randomGrid();
ignite.services().deployNodeSingleton(name, new CounterServiceImpl());
CounterService svc = ignite.services().serviceProxy(name, CounterService.class, false);
for (int i = 0; i < 10; i++)
svc.increment();
assertEquals(10, svc.get());
assertEquals(10, svc.localIncrements());
assertEquals(10, ignite.services(ignite.cluster().forLocal()).
serviceProxy(name, CounterService.class, false).localIncrements());
// Make sure that remote proxies were not called.
for (ClusterNode n : ignite.cluster().forRemotes().nodes()) {
CounterService rmtSvc =
ignite.services(ignite.cluster().forNode(n)).serviceProxy(name, CounterService.class, false);
assertEquals(0, rmtSvc.localIncrements());
}
}
/**
* Unwraps error message from InvocationTargetException.
*
* @throws Exception If failed.
*/
@SuppressWarnings("ThrowableNotThrown")
@Test
public void testException() throws Exception {
String name = "errorService";
Ignite ignite = grid(0);
ignite.services(ignite.cluster().forRemotes()).deployNodeSingleton(name, new ErrorServiceImpl());
final ErrorService svc = ignite.services().serviceProxy(name, ErrorService.class, false);
GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception {
svc.go();
return null;
}
}, ErrorServiceException.class, "Test exception");
}
/**
* @throws Exception If failed.
*/
@Test
public void testClusterSingletonProxy() throws Exception {
String name = "testClusterSingletonProxy";
Ignite ignite = randomGrid();
ignite.services().deployClusterSingleton(name, new CounterServiceImpl());
CounterService svc = ignite.services().serviceProxy(name, CounterService.class, true);
for (int i = 0; i < 10; i++)
svc.increment();
assertEquals(10, svc.get());
}
/**
* @throws Exception If failed.
*/
@Test
public void testMultiNodeProxy() throws Exception {
Ignite ignite = randomGrid();
int extras = 3;
startExtraNodes(extras);
String name = "testMultiNodeProxy";
ignite.services().deployNodeSingleton(name, new CounterServiceImpl());
CounterService svc = ignite.services().serviceProxy(name, CounterService.class, false);
for (int i = 0; i < extras; i++) {
svc.increment();
stopGrid(nodeCount() + i);
}
assertEquals(extras, svc.get());
}
/**
* @throws Exception If failed.
*/
@Test
public void testNodeSingletonRemoteNotStickyProxy() throws Exception {
String name = "testNodeSingletonRemoteNotStickyProxy";
Ignite ignite = randomGrid();
// Deploy only on remote nodes.
ignite.services(ignite.cluster().forRemotes()).deployNodeSingleton(name, new CounterServiceImpl());
info("Deployed service: " + name);
// Get local proxy.
CounterService svc = ignite.services().serviceProxy(name, CounterService.class, false);
for (int i = 0; i < 10; i++)
svc.increment();
assertEquals(10, svc.get());
int total = 0;
for (ClusterNode n : ignite.cluster().forRemotes().nodes()) {
CounterService rmtSvc =
ignite.services(ignite.cluster().forNode(n)).serviceProxy(name, CounterService.class, false);
int cnt = rmtSvc.localIncrements();
// Since deployment is not stick, count on each node must be less than 10.
assertTrue("Invalid local increments: " + cnt, cnt != 10);
total += cnt;
}
assertEquals(10, total);
}
/**
* @throws Exception If failed.
*/
@Test
public void testNodeSingletonRemoteStickyProxy() throws Exception {
String name = "testNodeSingletonRemoteStickyProxy";
Ignite ignite = randomGrid();
// Deploy only on remote nodes.
ignite.services(ignite.cluster().forRemotes()).deployNodeSingleton(name, new CounterServiceImpl());
// Get local proxy.
CounterService svc = ignite.services().serviceProxy(name, CounterService.class, true);
for (int i = 0; i < 10; i++)
svc.increment();
assertEquals(10, svc.get());
int total = 0;
for (ClusterNode n : ignite.cluster().forRemotes().nodes()) {
CounterService rmtSvc =
ignite.services(ignite.cluster().forNode(n)).serviceProxy(name, CounterService.class, false);
int cnt = rmtSvc.localIncrements();
assertTrue("Invalid local increments: " + cnt, cnt == 10 || cnt == 0);
total += rmtSvc.localIncrements();
}
assertEquals(10, total);
}
/**
* @throws Exception If failed.
*/
@Test
public void testSingletonProxyInvocation() throws Exception {
final String name = "testProxyInvocationFromSeveralNodes";
final Ignite ignite = grid(0);
ignite.services(ignite.cluster().forLocal()).deployClusterSingleton(name, new MapServiceImpl<String, Integer>());
for (int i = 1; i < nodeCount(); i++) {
MapService<Integer, String> svc = grid(i).services()
.serviceProxy(name, MapService.class, false, 1_000L);
// Make sure service is a proxy.
assertFalse(svc instanceof Service);
svc.put(i, Integer.toString(i));
}
assertEquals(nodeCount() - 1, ignite.services().serviceProxy(name, MapService.class, false).size());
}
/**
* Checks local service without the statistics.
*
* @throws Exception If failed.
*/
@Test
public void testLocalProxyInvocationWithoutStat() throws Exception {
checkLocalProxy(false);
}
/**
* Checks local service with the statistics enabled.
*
* @throws Exception If failed.
*/
@Test
public void testLocalProxyInvocationWithStat() throws Exception {
checkLocalProxy(true);
}
/**
* Checks remote non-sticky proxy without the statistics.
*
* @throws Exception If failed.
*/
@Test
public void testRemoteNotStickProxyInvocationWithoutStat() throws Exception {
checkRemoteProxy(false, false);
}
/**
* Checks remote non-sticky proxy with the statistics enabled.
*
* @throws Exception If failed.
*/
@Test
public void testRemoteNotStickyProxyInvocationWithStat() throws Exception {
checkRemoteProxy(true, false);
}
/**
* Checks remote sticky proxy without the statistics.
*
* @throws Exception If failed.
*/
@Test
public void testRemoteStickyProxyInvocationWithoutStat() throws Exception {
checkRemoteProxy(false, true);
}
/**
* Checks remote sticky proxy with the statistics enabled.
*
* @throws Exception If failed.
*/
@Test
public void testRemoteStickyProxyInvocationWithStat() throws Exception {
checkRemoteProxy(true, true);
}
/**
* Checks remote service proxy (node singleton) with or without the statistics.
*
* @param withStat If {@code true}, enables the service metrics, {@link ServiceConfiguration#setStatisticsEnabled(boolean)}.
* @param sticky If {@code true}, requests sticky proxy.
*/
private void checkRemoteProxy(boolean withStat, boolean sticky) throws InterruptedException {
final String svcName = "remoteServiceTest";
deployNodeSingleton(svcName, withStat);
Ignite ignite = grid(0);
// Get remote proxy.
MapService<Integer, String> svc = ignite.services(ignite.cluster().forRemotes()).
serviceProxy(svcName, MapService.class, sticky);
assertFalse(svc instanceof Service);
assertTrue(Arrays.asList(svc.getClass().getInterfaces()).contains(MapService.class));
assertEquals(svc.size(), 0);
for (int i = 0; i < nodeCount(); i++)
svc.put(i, Integer.toString(i));
int size = 0;
for (ClusterNode n : ignite.cluster().forRemotes().nodes()) {
MapService<Integer, String> map = ignite.services(ignite.cluster().forNode(n)).
serviceProxy(svcName, MapService.class, sticky);
assertFalse(map instanceof Service);
assertTrue(Arrays.asList(svc.getClass().getInterfaces()).contains(MapService.class));
if (map.size() != 0)
size += map.size();
}
assertEquals(nodeCount(), size);
}
/**
* Checks local service (node singleton) with or without statistics.
*
* @param withStat If {@code true}, enables the service metrics, {@link ServiceConfiguration#setStatisticsEnabled(boolean)}.
*/
private void checkLocalProxy(boolean withStat) throws Exception {
final String svcName = "localProxyTest";
deployNodeSingleton(svcName, withStat);
for (int i = 0; i < nodeCount(); i++) {
final int idx = i;
final AtomicReference<MapService<Integer, String>> ref = new AtomicReference<>();
//wait because after deployNodeSingleton we don't have guarantees what service was deploy.
boolean wait = GridTestUtils.waitForCondition(new PA() {
@Override public boolean apply() {
MapService<Integer, String> svc = grid(idx)
.services()
.serviceProxy(svcName, MapService.class, false);
ref.set(svc);
return (Proxy.isProxyClass(svc.getClass())) &&
Arrays.asList(svc.getClass().getInterfaces()).contains(MapService.class);
}
}, 2000);
// Make sure service is a local instance.
assertTrue("Invalid service instance [srv=" + ref.get() + ", node=" + i + ']', wait);
ref.get().put(i, Integer.toString(i));
}
MapService<Integer, String> map = grid(0).services().serviceProxy(svcName, MapService.class, false);
for (int i = 0; i < nodeCount(); i++)
assertEquals(1, map.size());
}
/**
* Deploys {@link MapServiceImpl} service over the cluster as node singleton.
*
* @param svcName Service name
* @param withStat If {@code true}, enabled the serive metrics {@link ServiceConfiguration#setStatisticsEnabled(boolean)}.
*/
private void deployNodeSingleton(String svcName, boolean withStat) throws InterruptedException {
ServiceConfiguration svcCfg = new ServiceConfiguration();
svcCfg.setName(svcName);
svcCfg.setMaxPerNodeCount(1);
svcCfg.setTotalCount(nodeCount());
svcCfg.setService(new MapServiceImpl<String, Integer>());
svcCfg.setStatisticsEnabled(withStat);
grid(0).services().deploy(svcCfg);
awaitPartitionMapExchange();
}
/**
* Simple map service.
*
* @param <K> Type of cache keys.
* @param <V> Type of cache values.
*/
protected interface MapService<K, V> {
/**
* Puts key-value pair into map.
*
* @param key Key.
* @param val Value.
*/
void put(K key, V val);
/**
* Gets value based on key.
*
* @param key Key.
* @return Value.
*/
V get(K key);
/**
* Clears map.
*/
void clear();
/**
* @return Map size.
*/
int size();
}
/**
* Cache service implementation.
*/
protected static class MapServiceImpl<K, V> implements MapService<K, V>, Service {
/** Underlying cache map. */
private final Map<K, V> map = new ConcurrentHashMap<>();
/** {@inheritDoc} */
@Override public void put(K key, V val) {
map.put(key, val);
}
/** {@inheritDoc} */
@Override public V get(K key) {
return map.get(key);
}
/** {@inheritDoc} */
@Override public void clear() {
map.clear();
}
/** {@inheritDoc} */
@Override public int size() {
return map.size();
}
/** {@inheritDoc} */
@Override public void cancel(ServiceContext ctx) {
X.println("Stopping cache service: " + ctx.name());
}
/** {@inheritDoc} */
@Override public void init(ServiceContext ctx) throws Exception {
X.println("Initializing counter service: " + ctx.name());
}
/** {@inheritDoc} */
@Override public void execute(ServiceContext ctx) throws Exception {
X.println("Executing cache service: " + ctx.name());
}
}
/**
*
*/
protected interface ErrorService extends Service {
/**
*
*/
void go() throws Exception;
}
/**
*
*/
protected static class ErrorServiceImpl implements ErrorService {
/** {@inheritDoc} */
@Override public void cancel(ServiceContext ctx) {
// No-op.
}
/** {@inheritDoc} */
@Override public void init(ServiceContext ctx) throws Exception {
// No-op.
}
/** {@inheritDoc} */
@Override public void execute(ServiceContext ctx) throws Exception {
// No-op.
}
/** {@inheritDoc} */
@Override public void go() throws Exception {
throw new ErrorServiceException("Test exception");
}
}
/** */
private static class ErrorServiceException extends Exception {
/** */
ErrorServiceException(String msg) {
super(msg);
}
}
}