blob: 28961e425d2aecc362dc528e6bfdc1098f903662 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.service;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.Ignite;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.services.ServiceConfiguration;
import org.apache.ignite.services.ServiceDeploymentException;
import org.apache.ignite.services.ServiceDescriptor;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
/**
* Test for deployment of multiple configurations at a time.
*/
public class GridServiceProcessorBatchDeploySelfTest extends GridCommonAbstractTest {
/** Number of services to be deployed. */
private static final int NUM_SERVICES = 100;
/** Number of nodes in the test cluster. */
private static final int NUM_NODES = 4;
/** Client node name. */
private static final String CLIENT_NODE_NAME = "client";
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
for (int i = 0; i < NUM_NODES; i++)
startGrid(i);
startClientGrid(CLIENT_NODE_NAME, getConfiguration(CLIENT_NODE_NAME));
DummyService.reset();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeployAll() throws Exception {
Ignite client = grid(CLIENT_NODE_NAME);
CountDownLatch latch = new CountDownLatch(NUM_SERVICES);
List<ServiceConfiguration> cfgs = getConfigs(client.cluster().forServers().predicate(), NUM_SERVICES);
subscribeExeLatch(cfgs, latch);
client.services().deployAll(cfgs);
assertTrue("Waiting for services deployment timed out.", latch.await(30, TimeUnit.SECONDS));
assertDeployedServices(client, cfgs);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeployAllAsync() throws Exception {
Ignite client = grid(CLIENT_NODE_NAME);
CountDownLatch latch = new CountDownLatch(NUM_SERVICES);
List<ServiceConfiguration> cfgs = getConfigs(client.cluster().forServers().predicate(), NUM_SERVICES);
subscribeExeLatch(cfgs, latch);
IgniteFuture<Void> fut = client.services().deployAllAsync(cfgs);
fut.get();
assertTrue("Waiting for services deployment timed out.", latch.await(30, TimeUnit.SECONDS));
assertDeployedServices(client, cfgs);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeployAllTopologyChange() throws Exception {
Ignite client = grid(CLIENT_NODE_NAME);
final AtomicBoolean finished = new AtomicBoolean();
IgniteInternalFuture<Object> topChangeFut = runTopChanger(finished);
try {
int numServices = 50;
int batchSize = 5;
CountDownLatch latch = new CountDownLatch(numServices);
IgnitePredicate<ClusterNode> depPred = new TestPredicate(getTestIgniteInstanceName());
List<ServiceConfiguration> cfgs = getConfigs(depPred, numServices);
subscribeExeLatch(cfgs, latch);
int from = 0;
while (from < numServices) {
int to = Math.min(numServices, from + batchSize);
client.services().deployAllAsync(cfgs.subList(from, to)).get(5000);
from = to;
}
assertTrue(latch.await(120, TimeUnit.SECONDS));
assertDeployedServices(client, cfgs);
}
finally {
finished.set(true);
}
topChangeFut.get();
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeployAllTopologyChangeFail() throws Exception {
final Ignite client = grid(CLIENT_NODE_NAME);
final AtomicBoolean finished = new AtomicBoolean();
IgniteInternalFuture<Object> topChangeFut = runTopChanger(finished);
try {
int numServices = 200;
int batchSize = 5;
CountDownLatch latch = new CountDownLatch(numServices);
IgnitePredicate<ClusterNode> depPred = new TestPredicate(getTestIgniteInstanceName());
List<ServiceConfiguration> cfgs = getConfigs(depPred, numServices);
List<ServiceConfiguration> failingCfgs = new ArrayList<>();
subscribeExeLatch(cfgs, latch);
int from = 0;
while (from < numServices) {
int to = Math.min(numServices, from + batchSize);
List<ServiceConfiguration> cfgsBatch = cfgs.subList(from, to);
ServiceConfiguration failingCfg = cfgsBatch.get(0);
failingCfg.setName(null);
failingCfgs.add(failingCfg);
try {
client.services().deployAllAsync(cfgsBatch).get(5000);
fail("Should never reach here.");
}
catch (ServiceDeploymentException e) {
assertEquals(1, e.getFailedConfigurations().size());
ServiceConfiguration actFailedCfg = copyService(e.getFailedConfigurations().iterator().next());
assertEquals(failingCfg, actFailedCfg);
latch.countDown();
}
from = to;
}
assertTrue(latch.await(120, TimeUnit.SECONDS));
cfgs.removeAll(failingCfgs);
assertDeployedServices(client, cfgs);
}
finally {
finished.set(true);
}
topChangeFut.get();
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeployAllFail() throws Exception {
deployAllFail(false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeployAllAsyncFail() throws Exception {
deployAllFail(true);
}
/**
* @param async If {@code true}, then asynchronous method of deployment will be performed.
* @throws Exception If failed.
*/
private void deployAllFail(boolean async) throws Exception {
Ignite client = grid(CLIENT_NODE_NAME);
CountDownLatch latch = new CountDownLatch(NUM_SERVICES - 1);
List<ServiceConfiguration> cfgs = getConfigs(client.cluster().forServers().predicate(), NUM_SERVICES);
subscribeExeLatch(cfgs, latch);
ServiceConfiguration failingCfg = cfgs.get(cfgs.size() - 1);
failingCfg.setName(null);
assertFailingDeploy(client, async, cfgs, failingCfg);
assertTrue("Waiting for services deployment timed out.", latch.await(30, TimeUnit.SECONDS));
assertDeployedServices(client, cfgs.subList(0, cfgs.size() - 1));
}
/**
* @throws Exception If failed.
*/
@Test
public void testClashingNames() throws Exception {
Ignite client = grid(CLIENT_NODE_NAME);
CountDownLatch latch = new CountDownLatch(NUM_SERVICES);
List<ServiceConfiguration> cfgs = getConfigs(client.cluster().forServers().predicate(), NUM_SERVICES);
subscribeExeLatch(cfgs, latch);
List<ServiceConfiguration> fstBatch = cfgs.subList(0, NUM_SERVICES / 2);
List<ServiceConfiguration> sndBatch = cfgs.subList(NUM_SERVICES / 4, NUM_SERVICES);
IgniteFuture<Void> fstFut = client.services().deployAllAsync(fstBatch);
IgniteFuture<Void> sndFut = client.services().deployAllAsync(sndBatch);
fstFut.get();
sndFut.get();
assertTrue("Waiting for services deployment timed out.", latch.await(30, TimeUnit.SECONDS));
assertDeployedServices(client, cfgs);
}
/**
* @throws Exception If failed.
*/
@Test
public void testClashingNamesFail() throws Exception {
Ignite client = grid(CLIENT_NODE_NAME);
List<ServiceConfiguration> cfgs = getConfigs(client.cluster().forServers().predicate(), NUM_SERVICES);
int numDepSvcs = NUM_SERVICES - 1;
CountDownLatch latch = new CountDownLatch(numDepSvcs);
List<ServiceConfiguration> fstBatch = cfgs.subList(0, NUM_SERVICES / 2);
List<ServiceConfiguration> sndBatch = cfgs.subList(NUM_SERVICES / 4, NUM_SERVICES);
subscribeExeLatch(cfgs, latch);
IgniteFuture<Void> fut = client.services().deployAllAsync(fstBatch);
ServiceConfiguration failingCfg = cfgs.get(NUM_SERVICES - 1);
failingCfg.setName(null);
assertFailingDeploy(client, false, sndBatch, failingCfg);
fut.get();
assertTrue("Waiting for services deployment timed out.", latch.await(30, TimeUnit.SECONDS));
assertDeployedServices(client, cfgs.subList(0, numDepSvcs));
}
/**
* @throws Exception If failed.
*/
@Test
public void testClashingNameDifferentConfig() throws Exception {
Ignite client = grid(CLIENT_NODE_NAME);
List<ServiceConfiguration> cfgs = getConfigs(client.cluster().forServers().predicate(), NUM_SERVICES);
int numDepSvcs = NUM_SERVICES - 1;
CountDownLatch latch = new CountDownLatch(numDepSvcs);
List<ServiceConfiguration> fstBatch = cfgs.subList(0, NUM_SERVICES / 2);
List<ServiceConfiguration> sndBatch = cfgs.subList(NUM_SERVICES / 4, NUM_SERVICES - 1);
subscribeExeLatch(cfgs, latch);
client.services().deployAll(fstBatch);
ServiceConfiguration failingCfg = copyService(cfgs.get(NUM_SERVICES - 1));
// Same name, different config.
failingCfg.setName(fstBatch.get(0).getName());
failingCfg.setTotalCount(fstBatch.get(0).getTotalCount() + 1);
sndBatch.add(failingCfg);
assertFailingDeploy(client, false, sndBatch, failingCfg);
assertTrue("Waiting for services deployment timed out.", latch.await(30, TimeUnit.SECONDS));
assertDeployedServices(client, cfgs.subList(0, numDepSvcs));
}
/**
* @throws Exception If failed.
*/
@Test
public void testCancelAll() throws Exception {
Ignite client = grid(CLIENT_NODE_NAME);
List<ServiceConfiguration> cfgs = getConfigs(client.cluster().forServers().predicate(), NUM_SERVICES);
CountDownLatch latch = new CountDownLatch(NUM_SERVICES);
subscribeExeLatch(cfgs, latch);
client.services().deployAll(cfgs);
latch.await(30, TimeUnit.SECONDS);
client.services().cancelAll();
assertDeployedServices(client, Collections.<ServiceConfiguration>emptyList());
}
/**
* @throws Exception If failed.
*/
@Test
public void testCancelAllAsync() throws Exception {
Ignite client = grid(CLIENT_NODE_NAME);
List<ServiceConfiguration> cfgs = getConfigs(client.cluster().forServers().predicate(), NUM_SERVICES);
CountDownLatch latch = new CountDownLatch(NUM_SERVICES);
subscribeExeLatch(cfgs, latch);
client.services().deployAll(cfgs);
latch.await(30, TimeUnit.SECONDS);
IgniteFuture<Void> fut = client.services().cancelAllAsync();
fut.get();
assertDeployedServices(client, Collections.<ServiceConfiguration>emptyList());
}
/**
* @throws Exception If failed.
*/
@Test
public void testCancelAllTopologyChange() throws Exception {
IgniteEx client = grid(CLIENT_NODE_NAME);
int numServices = 500;
List<ServiceConfiguration> cfgs = getConfigs(client.cluster().forServers().predicate(), numServices);
CountDownLatch latch = new CountDownLatch(numServices);
subscribeExeLatch(cfgs, latch);
client.services().deployAll(cfgs);
latch.await(30, TimeUnit.SECONDS);
final AtomicBoolean finished = new AtomicBoolean();
IgniteInternalFuture<Object> topChangeFut = runTopChanger(finished);
List<String> names = new ArrayList<>();
for (ServiceConfiguration cfg : cfgs)
names.add(cfg.getName());
try {
int batchSize = 5;
int from = 0;
while (from < numServices) {
int to = Math.min(numServices, from + batchSize);
log.info("Trying to cancel services [" + from + ".." + to + ")");
client.services().cancelAllAsync(names.subList(from, to)).get(5000);
from = to;
}
assertDeployedServices(client, Collections.<ServiceConfiguration>emptyList());
}
finally {
finished.set(true);
}
topChangeFut.get();
}
/**
* @throws Exception If failed.
*/
@Test
public void testCancelAllClashingNames() throws Exception {
Ignite client = grid(CLIENT_NODE_NAME);
List<ServiceConfiguration> cfgs = getConfigs(client.cluster().forServers().predicate(), NUM_SERVICES);
CountDownLatch latch = new CountDownLatch(NUM_SERVICES);
subscribeExeLatch(cfgs, latch);
client.services().deployAll(cfgs);
latch.await(30, TimeUnit.SECONDS);
List<String> names = new ArrayList<>();
for (ServiceConfiguration cfg : cfgs)
names.add(cfg.getName());
int batchSize = 5;
int from = 0;
while (from < NUM_SERVICES) {
int to = Math.min(NUM_SERVICES, from + batchSize);
List<String> toCancel = new ArrayList<>(names.subList(from, to));
toCancel.add(toCancel.get(0));
client.services().cancelAll(toCancel);
from = to;
}
assertDeployedServices(client, Collections.<ServiceConfiguration>emptyList());
}
/**
* @param client Client.
* @param async If {@code true}, then async version of deploy method will be used.
* @param cfgs Service configurations.
* @param failingCfg Configuration of the failing service.
* @throws Exception If failed.
*/
private void assertFailingDeploy(Ignite client, boolean async, List<ServiceConfiguration> cfgs,
ServiceConfiguration failingCfg) throws Exception {
IgniteFuture<Void> fut = null;
if (async)
fut = client.services().deployAllAsync(cfgs);
try {
if (async)
fut.get();
else
client.services().deployAll(cfgs);
fail("Should never reach here.");
}
catch (ServiceDeploymentException e) {
info("Expected exception: " + e.getMessage());
Collection<ServiceConfiguration> expFails = Collections.singleton(failingCfg);
Collection<ServiceConfiguration> actFails = e.getFailedConfigurations();
// Some cfgs may be lazy. Construct ServiceConfiguration from them for comparison.
Collection<ServiceConfiguration> actFailsCp = new ArrayList<>(actFails.size());
for (ServiceConfiguration cfg : actFails)
actFailsCp.add(copyService(cfg));
assertEqualsCollections(expFails, actFailsCp);
}
}
/**
* @param cfg Config.
* @return Copy of provided configuration.
*/
private ServiceConfiguration copyService(ServiceConfiguration cfg) {
ServiceConfiguration cfgCp = new ServiceConfiguration();
cfgCp.setName(cfg.getName());
cfgCp.setMaxPerNodeCount(cfg.getMaxPerNodeCount());
cfgCp.setTotalCount(cfg.getTotalCount());
cfgCp.setAffinityKey(cfg.getAffinityKey());
cfgCp.setCacheName(cfg.getCacheName());
cfgCp.setName(cfg.getName());
cfgCp.setService(cfg.getService());
cfgCp.setNodeFilter(cfg.getNodeFilter());
return cfgCp;
}
/**
* @param client Client Ignite instance.
* @param expCfgs Configurations of services that are expected to be deployed.
*/
private void assertDeployedServices(Ignite client, Collection<ServiceConfiguration> expCfgs) {
Set<String> expNames = new HashSet<>();
Set<String> actNames = new HashSet<>();
for (ServiceConfiguration cfg : expCfgs)
expNames.add(cfg.getName());
for (ServiceDescriptor desc : client.services().serviceDescriptors())
actNames.add(desc.name());
assertEquals(expNames, actNames);
}
/**
* @param nodePred Node predicate.
* @param numServices Number of configurations to generate.
* @return Generated services configurations.
*/
private List<ServiceConfiguration> getConfigs(IgnitePredicate<ClusterNode> nodePred, int numServices) {
List<ServiceConfiguration> cfgs = new ArrayList<>(numServices);
for (int i = 0; i < numServices; i++) {
String name = "testService-" + i;
ServiceConfiguration cfg = new ServiceConfiguration();
cfg.setName(name);
cfg.setTotalCount(1);
cfg.setMaxPerNodeCount(1);
cfg.setService(new DummyService());
cfg.setNodeFilter(nodePred);
cfgs.add(cfg);
}
return cfgs;
}
/**
* @param cfgs Configurations.
* @param latch Latch.
*/
private void subscribeExeLatch(List<ServiceConfiguration> cfgs, CountDownLatch latch) {
for (ServiceConfiguration cfg : cfgs)
DummyService.exeLatch(cfg.getName(), latch);
}
/**
* @param finished Finished flag.
* @throws Exception If failed.
* @return Future.
*/
private IgniteInternalFuture<Object> runTopChanger(final AtomicBoolean finished) throws Exception {
return runAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
String namePrefix = "extra-node-";
int extraNodesNum = 3;
while (!finished.get()) {
for (int i = 0; i < extraNodesNum; i++)
startGrid(namePrefix + i);
for (int i = 0; i < extraNodesNum; i++)
stopGrid(namePrefix + i);
awaitPartitionMapExchange();
}
return null;
}
});
}
/**
* Test predicate.
*/
private static class TestPredicate implements IgnitePredicate<ClusterNode> {
/** */
private final String namePrefix;
/**
* @param namePrefix Prefix to match instances name.
*/
public TestPredicate(String namePrefix) {
this.namePrefix = namePrefix;
}
/** {@inheritDoc} */
@Override public boolean apply(ClusterNode node) {
String gridName = node.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME);
assert gridName != null;
return !node.isClient() && gridName.startsWith(namePrefix);
}
}
}