blob: edb6338a1fb351beaccc62f34ff5f712c75bf9b7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.service;
import java.io.Serializable;
import java.util.Collection;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteServices;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor;
import org.apache.ignite.internal.util.lang.GridAbsPredicateX;
import org.apache.ignite.internal.util.typedef.CA;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceConfiguration;
import org.apache.ignite.services.ServiceContext;
import org.apache.ignite.services.ServiceDescriptor;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.junits.common.GridCommonTest;
import org.junit.Test;
/**
* Tests for {@link GridAffinityProcessor}.
*/
@GridCommonTest(group = "Service Processor")
public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbstractTest {
/** Cache name. */
public static final String CACHE_NAME = "testServiceCache";
/** Random generator. */
private static final Random RAND = new Random();
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration c = super.getConfiguration(igniteInstanceName);
ServiceConfiguration[] svcs = services();
if (svcs != null)
c.setServiceConfiguration(svcs);
CacheConfiguration cc = new CacheConfiguration(DEFAULT_CACHE_NAME);
cc.setName(CACHE_NAME);
cc.setCacheMode(CacheMode.PARTITIONED);
cc.setBackups(nodeCount());
c.setCacheConfiguration(cc);
return c;
}
/**
* Gets number of nodes.
*
* @return Number of nodes.
*/
protected abstract int nodeCount();
/**
* Gets services configurations.
*
* @return Services configuration.
*/
protected ServiceConfiguration[] services() {
return null;
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
assert nodeCount() >= 1;
for (int i = 0; i < nodeCount(); i++)
startGrid(i);
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
DummyService.reset();
}
/**
* @throws Exception If failed.
*/
protected void startExtraNodes(int cnt) throws Exception {
for (int i = 0; i < cnt; i++)
startGrid(nodeCount() + i);
}
/** */
protected void startExtraNodes(int servers, int clients) throws Exception {
startExtraNodes(servers);
for (int i = 0; i < clients; i++) {
final String nodeName = getTestIgniteInstanceName(nodeCount() + servers + i);
startClientGrid(nodeName, getConfiguration(nodeName));
}
}
/**
* @throws Exception If failed.
*/
protected void stopExtraNodes(int cnt) throws Exception {
for (int i = 0; i < cnt; i++)
stopGrid(nodeCount() + i);
}
/**
* @return Random grid.
*/
protected IgniteEx randomGrid() {
return grid(RAND.nextInt(nodeCount()));
}
/**
* @throws Exception If failed.
*/
@Test
public void testSameConfigurationOld() throws Exception {
String name = "dupServiceOld";
IgniteServices svcs1 = randomGrid().services().withAsync();
IgniteServices svcs2 = randomGrid().services().withAsync();
svcs1.deployClusterSingleton(name, new DummyService());
IgniteFuture<?> fut1 = svcs1.future();
svcs2.deployClusterSingleton(name, new DummyService());
IgniteFuture<?> fut2 = svcs2.future();
info("Deployed service: " + name);
fut1.get();
info("Finished waiting for service future1: " + name);
// This must succeed without exception because configuration is the same.
fut2.get();
info("Finished waiting for service future2: " + name);
}
/**
* @throws Exception If failed.
*/
@Test
public void testSameConfiguration() throws Exception {
String name = "dupServiceOld";
IgniteServices svcs1 = randomGrid().services();
IgniteServices svcs2 = randomGrid().services();
IgniteFuture<?> fut1 = svcs1.deployClusterSingletonAsync(name, new DummyService());
IgniteFuture<?> fut2 = svcs2.deployClusterSingletonAsync(name, new DummyService());
info("Deployed service: " + name);
fut1.get();
info("Finished waiting for service future1: " + name);
// This must succeed without exception because configuration is the same.
fut2.get();
info("Finished waiting for service future2: " + name);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDifferentConfigurationOld() throws Exception {
String name = "dupServiceOld";
IgniteServices svcs1 = randomGrid().services().withAsync();
IgniteServices svcs2 = randomGrid().services().withAsync();
svcs1.deployClusterSingleton(name, new DummyService());
IgniteFuture<?> fut1 = svcs1.future();
svcs2.deployNodeSingleton(name, new DummyService());
IgniteFuture<?> fut2 = svcs2.future();
info("Deployed service: " + name);
fut1.get();
info("Finished waiting for service future: " + name);
try {
fut2.get();
fail("Failed to receive mismatching configuration exception.");
}
catch (IgniteException e) {
info("Received mismatching configuration exception: " + e.getMessage());
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testDifferentConfiguration() throws Exception {
String name = "dupService";
IgniteServices svcs1 = randomGrid().services();
IgniteServices svcs2 = randomGrid().services();
IgniteFuture<?> fut1 = svcs1.deployClusterSingletonAsync(name, new DummyService());
IgniteFuture<?> fut2 = svcs2.deployNodeSingletonAsync(name, new DummyService());
info("Deployed service: " + name);
try {
fut1.get();
info("Finished waiting for service future: " + name);
fut2.get();
fail("Failed to receive mismatching configuration exception.");
}
catch (IgniteException e) {
info("Received mismatching configuration exception: " + e.getMessage());
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testGetServiceByName() throws Exception {
String name = "serviceByName";
Ignite g = randomGrid();
g.services().deployNodeSingleton(name, new DummyService());
DummyService svc = g.services().service(name);
assertNotNull(svc);
Collection<DummyService> svcs = g.services().services(name);
assertEquals(1, svcs.size());
}
/**
* @throws Exception If failed.
*/
@Test
public void testGetServicesByName() throws Exception {
final String name = "servicesByName";
Ignite g = randomGrid();
g.services().deployMultiple(name, new DummyService(), nodeCount() * 2, 3);
GridTestUtils.retryAssert(log, 50, 200, new CA() {
@Override public void apply() {
int cnt = 0;
for (int i = 0; i < nodeCount(); i++) {
Collection<DummyService> svcs = grid(i).services().services(name);
if (svcs != null)
cnt += svcs.size();
}
assertEquals(nodeCount() * 2, cnt);
}
});
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeployOnEachNodeOld() throws Exception {
Ignite g = randomGrid();
String name = "serviceOnEachNodeOld";
CountDownLatch latch = new CountDownLatch(nodeCount());
DummyService.exeLatch(name, latch);
IgniteServices svcs = g.services().withAsync();
svcs.deployNodeSingleton(name, new DummyService());
IgniteFuture<?> fut = svcs.future();
info("Deployed service: " + name);
fut.get();
info("Finished waiting for service future: " + name);
latch.await();
assertEquals(name, nodeCount(), DummyService.started(name));
assertEquals(name, 0, DummyService.cancelled(name));
checkCount(name, g.services().serviceDescriptors(), nodeCount());
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeployOnEachNode() throws Exception {
Ignite g = randomGrid();
String name = "serviceOnEachNode";
CountDownLatch latch = new CountDownLatch(nodeCount());
DummyService.exeLatch(name, latch);
IgniteFuture<?> fut = g.services().deployNodeSingletonAsync(name, new DummyService());
info("Deployed service: " + name);
fut.get();
info("Finished waiting for service future: " + name);
latch.await();
assertEquals(name, nodeCount(), DummyService.started(name));
assertEquals(name, 0, DummyService.cancelled(name));
checkCount(name, g.services().serviceDescriptors(), nodeCount());
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeploySingletonOld() throws Exception {
Ignite g = randomGrid();
String name = "serviceSingletonOld";
CountDownLatch latch = new CountDownLatch(1);
DummyService.exeLatch(name, latch);
IgniteServices svcs = g.services().withAsync();
svcs.deployClusterSingleton(name, new DummyService());
IgniteFuture<?> fut = svcs.future();
info("Deployed service: " + name);
fut.get();
info("Finished waiting for service future: " + name);
latch.await();
assertEquals(name, 1, DummyService.started(name));
assertEquals(name, 0, DummyService.cancelled(name));
checkCount(name, g.services().serviceDescriptors(), 1);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeploySingleton() throws Exception {
Ignite g = randomGrid();
String name = "serviceSingleton";
CountDownLatch latch = new CountDownLatch(1);
DummyService.exeLatch(name, latch);
IgniteFuture<?> fut = g.services().deployClusterSingletonAsync(name, new DummyService());
info("Deployed service: " + name);
fut.get();
info("Finished waiting for service future: " + name);
latch.await();
assertEquals(name, 1, DummyService.started(name));
assertEquals(name, 0, DummyService.cancelled(name));
checkCount(name, g.services().serviceDescriptors(), 1);
}
/**
* @throws Exception If failed.
*/
@Test
public void testAffinityDeployOld() throws Exception {
Ignite g = randomGrid();
final Integer affKey = 1;
// Store a cache key.
g.cache(CACHE_NAME).put(affKey, affKey.toString());
String name = "serviceAffinityOld";
IgniteServices svcs = g.services().withAsync();
svcs.deployKeyAffinitySingleton(name, new AffinityService(affKey),
CACHE_NAME, affKey);
IgniteFuture<?> fut = svcs.future();
info("Deployed service: " + name);
fut.get();
info("Finished waiting for service future: " + name);
checkCount(name, g.services().serviceDescriptors(), 1);
}
/**
* @throws Exception If failed.
*/
@Test
public void testAffinityDeploy() throws Exception {
Ignite g = randomGrid();
final Integer affKey = 1;
// Store a cache key.
g.cache(CACHE_NAME).put(affKey, affKey.toString());
String name = "serviceAffinity";
IgniteFuture<?> fut = g.services().deployKeyAffinitySingletonAsync(name, new AffinityService(affKey),
CACHE_NAME, affKey);
info("Deployed service: " + name);
fut.get();
info("Finished waiting for service future: " + name);
checkCount(name, g.services().serviceDescriptors(), 1);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeployMultiple1Old() throws Exception {
Ignite g = randomGrid();
String name = "serviceMultiple1Old";
CountDownLatch latch = new CountDownLatch(nodeCount() * 2);
DummyService.exeLatch(name, latch);
IgniteServices svcs = g.services().withAsync();
svcs.deployMultiple(name, new DummyService(), nodeCount() * 2, 3);
IgniteFuture<?> fut = svcs.future();
info("Deployed service: " + name);
fut.get();
info("Finished waiting for service future: " + name);
latch.await();
assertEquals(name, nodeCount() * 2, DummyService.started(name));
assertEquals(name, 0, DummyService.cancelled(name));
checkCount(name, g.services().serviceDescriptors(), nodeCount() * 2);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeployMultiple1() throws Exception {
Ignite g = randomGrid();
String name = "serviceMultiple1";
CountDownLatch latch = new CountDownLatch(nodeCount() * 2);
DummyService.exeLatch(name, latch);
IgniteFuture<?> fut = g.services().deployMultipleAsync(name, new DummyService(), nodeCount() * 2, 3);
info("Deployed service: " + name);
fut.get();
info("Finished waiting for service future: " + name);
latch.await();
assertEquals(name, nodeCount() * 2, DummyService.started(name));
assertEquals(name, 0, DummyService.cancelled(name));
checkCount(name, g.services().serviceDescriptors(), nodeCount() * 2);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeployMultiple2Old() throws Exception {
Ignite g = randomGrid();
String name = "serviceMultiple2Old";
int cnt = nodeCount() * 2 + 1;
CountDownLatch latch = new CountDownLatch(cnt);
DummyService.exeLatch(name, latch);
IgniteServices svcs = g.services().withAsync();
svcs.deployMultiple(name, new DummyService(), cnt, 3);
IgniteFuture<?> fut = svcs.future();
info("Deployed service: " + name);
fut.get();
info("Finished waiting for service future: " + name);
latch.await();
assertEquals(name, cnt, DummyService.started(name));
assertEquals(name, 0, DummyService.cancelled(name));
checkCount(name, g.services().serviceDescriptors(), cnt);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeployMultiple2() throws Exception {
Ignite g = randomGrid();
String name = "serviceMultiple2";
int cnt = nodeCount() * 2 + 1;
CountDownLatch latch = new CountDownLatch(cnt);
DummyService.exeLatch(name, latch);
IgniteFuture<?> fut = g.services().deployMultipleAsync(name, new DummyService(), cnt, 3);
info("Deployed service: " + name);
fut.get();
info("Finished waiting for service future: " + name);
latch.await();
assertEquals(name, cnt, DummyService.started(name));
assertEquals(name, 0, DummyService.cancelled(name));
checkCount(name, g.services().serviceDescriptors(), cnt);
}
/**
* @throws Exception If failed.
*/
@Test
public void testCancelSingleton() throws Exception {
Ignite g = randomGrid();
String name = "serviceCancel";
CountDownLatch latch = new CountDownLatch(1);
DummyService.exeLatch(name, latch);
g.services().deployClusterSingleton(name, new DummyService());
info("Deployed service: " + name);
latch.await();
assertEquals(name, 1, DummyService.started(name));
assertEquals(name, 0, DummyService.cancelled(name));
latch = new CountDownLatch(1);
DummyService.cancelLatch(name, latch);
g.services().cancel(name);
info("Cancelled service: " + name);
latch.await();
assertEquals(name, 1, DummyService.started(name));
assertEquals(name, 1, DummyService.cancelled(name));
}
/**
* @throws Exception If failed.
*/
@Test
public void testCancelSingletonAsync() throws Exception {
Ignite g = randomGrid();
String name = "serviceCancelAsync";
CountDownLatch latch = new CountDownLatch(1);
DummyService.exeLatch(name, latch);
g.services().deployClusterSingleton(name, new DummyService());
info("Deployed service: " + name);
latch.await();
assertEquals(name, 1, DummyService.started(name));
assertEquals(name, 0, DummyService.cancelled(name));
latch = new CountDownLatch(1);
DummyService.cancelLatch(name, latch);
g.services().cancelAsync(name).get();
info("Cancelled service: " + name);
latch.await();
assertEquals(name, 1, DummyService.started(name));
assertEquals(name, 1, DummyService.cancelled(name));
}
/**
* @throws Exception If failed.
*/
@Test
public void testCancelEachNode() throws Exception {
Ignite g = randomGrid();
String name = "serviceCancelEachNode";
CountDownLatch latch = new CountDownLatch(nodeCount());
DummyService.exeLatch(name, latch);
g.services().deployNodeSingleton(name, new DummyService());
info("Deployed service: " + name);
latch.await();
assertEquals(name, nodeCount(), DummyService.started(name));
assertEquals(name, 0, DummyService.cancelled(name));
latch = new CountDownLatch(nodeCount());
DummyService.cancelLatch(name, latch);
g.services().cancel(name);
info("Cancelled service: " + name);
latch.await();
assertEquals(name, nodeCount(), DummyService.started(name));
assertEquals(name, nodeCount(), DummyService.cancelled(name));
}
/**
* @throws Exception If failed.
*/
@Test
public void testCancelAsyncEachNode() throws Exception {
Ignite g = randomGrid();
String name = "serviceCancelEachNodeAsync";
CountDownLatch latch = new CountDownLatch(nodeCount());
DummyService.exeLatch(name, latch);
g.services().deployNodeSingleton(name, new DummyService());
info("Deployed service: " + name);
latch.await();
assertEquals(name, nodeCount(), DummyService.started(name));
assertEquals(name, 0, DummyService.cancelled(name));
latch = new CountDownLatch(nodeCount());
DummyService.cancelLatch(name, latch);
g.services().cancelAsync(name).get();
info("Cancelled service: " + name);
latch.await();
assertEquals(name, nodeCount(), DummyService.started(name));
assertEquals(name, nodeCount(), DummyService.cancelled(name));
}
/**
* @param svcName Service name.
* @param ignite Ignite instance.
* @param cnt Expected count.
*/
protected void checkCount(String svcName, IgniteEx ignite, int cnt) throws IgniteInterruptedCheckedException {
AffinityTopologyVersion topVer = ignite.context().discovery().topologyVersionEx();
waitForServicesReadyTopology(ignite, topVer);
assertEquals(cnt, actualCount(svcName, ignite.services().serviceDescriptors()));
}
/**
* @param svcName Service name.
* @param descs Descriptors.
* @param cnt Expected count.
*/
protected void checkCount(String svcName, Iterable<ServiceDescriptor> descs, int cnt) {
assertEquals(cnt, actualCount(svcName, descs));
}
/**
* @param svcName Service name.
* @param descs Descriptors.
* @return Services count.
*/
protected int actualCount(String svcName, Iterable<ServiceDescriptor> descs) {
int sum = 0;
for (ServiceDescriptor d : descs) {
if (d.name().equals(svcName)) {
for (Integer i : d.topologySnapshot().values())
sum += i;
}
}
return sum;
}
/**
* @param srvcName Service name
* @param expectedDeps Expected number of service deployments
*
*/
protected boolean waitForDeployment(final String srvcName, final int expectedDeps) throws IgniteInterruptedCheckedException {
final Ignite g = randomGrid();
return GridTestUtils.waitForCondition(new GridAbsPredicateX() {
@Override public boolean applyx() {
return actualCount(srvcName, g.services().serviceDescriptors()) == expectedDeps;
}
}, 12_000);
}
/**
* Counter service.
*/
protected interface CounterService {
/**
* @return Number of increments happened on the same service instance.
*/
int localIncrements();
/**
* @return Incremented value.
*/
int increment();
/**
* @return Current value.
*/
int get();
}
/**
* @param ignite Ignite instance.
* @param srvcName Affinity service name.
*/
protected void checkAffinityServiceDeployment(Ignite ignite, String srvcName) {
ServiceDescriptor desc = null;
for (ServiceDescriptor d : ignite.services().serviceDescriptors()) {
if (d.name().equals(srvcName)) {
desc = d;
break;
}
}
assertNotNull(desc);
assertEquals(1, desc.topologySnapshot().size());
ClusterNode n = ignite.affinity(desc.cacheName()).mapKeyToNode(desc.affinityKey());
assertNotNull(n);
assertTrue(desc.topologySnapshot().containsKey(n.id()));
}
/**
* Affinity service.
*/
protected static class AffinityService implements Service {
/** */
private static final long serialVersionUID = 0L;
/** Affinity key. */
private final Object affKey;
/** Grid. */
@IgniteInstanceResource
private Ignite g;
/**
* @param affKey Affinity key.
*/
public AffinityService(Object affKey) {
this.affKey = affKey;
}
/** {@inheritDoc} */
@Override public void cancel(ServiceContext ctx) {
// No-op.
}
/** {@inheritDoc} */
@Override public void init(ServiceContext ctx) throws Exception {
X.println("Initializing affinity service for key: " + affKey);
}
/** {@inheritDoc} */
@Override public void execute(ServiceContext ctx) {
X.println("Executing affinity service for key: " + affKey);
}
}
/**
* Counter service implementation.
*/
protected static class CounterServiceImpl implements CounterService, Service {
/** Auto-injected grid instance. */
@IgniteInstanceResource
private Ignite ignite;
/** */
private IgniteCache<String, Value> cache;
/** Cache key. */
private String key;
/** Invocation count. */
private AtomicInteger locIncrements = new AtomicInteger();
/** {@inheritDoc} */
@Override public int localIncrements() {
return locIncrements.get();
}
/** {@inheritDoc} */
@Override public int increment() {
locIncrements.incrementAndGet();
try {
while (true) {
Value val = cache.get(key);
if (val == null) {
Value old = cache.getAndPutIfAbsent(key, val = new Value(0));
if (old != null)
val = old;
}
Value newVal = new Value(val.get() + 1);
if (cache.replace(key, val, newVal))
return newVal.get();
}
}
catch (Exception e) {
throw new IgniteException(e);
}
}
/** {@inheritDoc} */
@Override public int get() {
try {
Value val = cache.get(key);
return val == null ? 0 : val.get();
}
catch (Exception e) {
throw new IgniteException(e);
}
}
/** {@inheritDoc} */
@Override public void cancel(ServiceContext ctx) {
X.println("Stopping counter service: " + ctx.name());
}
/** {@inheritDoc} */
@Override public void init(ServiceContext ctx) throws Exception {
X.println("Initializing counter service: " + ctx.name());
key = ctx.name();
cache = ignite.cache(CACHE_NAME);
}
/** {@inheritDoc} */
@Override public void execute(ServiceContext ctx) throws Exception {
X.println("Executing counter service: " + ctx.name());
}
/**
*
*/
private static class Value implements Serializable {
/** Value. */
private final int v;
/**
* @param v Value.
*/
private Value(int v) {
this.v = v;
}
/**
* @return Value.
*/
int get() {
return v;
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
return this == o || o instanceof Value && v == ((Value)o).v;
}
/** {@inheritDoc} */
@Override public int hashCode() {
return v;
}
}
}
}