blob: 7c2bb99cacd44893bce26621d1a2fc8fb5fa9e1e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.service;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteException;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceConfiguration;
import org.apache.ignite.services.ServiceContext;
import org.apache.ignite.services.ServiceDeploymentException;
import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils.DiscoveryHook;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
/**
* Tests service invocation if requested service is not deployed on the local node and the service topology is required.
*/
public class GridServiceProxyTopologyInitializationTest extends GridCommonAbstractTest {
/** Number of the test nodes. */
private static final int NODES_CNT = 2;
/** Name of the service that throws exception during initialization. */
private static final String BROKEN_SRVC = "broken-service";
/** Name of the decent service. */
private static final String DECENT_SRVC = "decent-service";
/** Name of the attribute that shows whether test servise deployment will be skipped on the node. */
private static final String ATTR_SKIP_DEPLOYMENT = "skip-deployment";
/** Latch that indicates whether {@link ServiceSingleNodeDeploymentResultBatch} execution should be proceeded. */
private final CountDownLatch fullMsgUnblockedLatch = new CountDownLatch(1);
/** Latch that indicated whether {@link ServiceSingleNodeDeploymentResultBatch} was received on the remote node. */
private final CountDownLatch fullMsgReceivedLatch = new CountDownLatch(1);
/** Latch that indicated whether {@link ServiceSingleNodeDeploymentResultBatch} was handled on the remote node. */
private final CountDownLatch fullMsgHandledLatch = new CountDownLatch(1);
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
if (getTestIgniteInstanceName(NODES_CNT - 1).equals(igniteInstanceName)) {
((TestTcpDiscoverySpi)cfg.getDiscoverySpi()).discoveryHook(new DiscoveryHook() {
@Override public void beforeDiscovery(DiscoveryCustomMessage customMsg) {
if (customMsg instanceof ServiceClusterDeploymentResultBatch) {
fullMsgReceivedLatch.countDown();
try {
fullMsgUnblockedLatch.await(getTestTimeout(), MILLISECONDS);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
@Override public void afterDiscovery(DiscoveryCustomMessage customMsg) {
if (customMsg instanceof ServiceClusterDeploymentResultBatch)
fullMsgHandledLatch.countDown();
}
});
cfg.setUserAttributes(Collections.singletonMap(ATTR_SKIP_DEPLOYMENT, true));
}
return cfg;
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
stopAllGrids();
}
/**
* Tests service invocation after its topology is initialized on the local node.
*
* @throws Exception If fails.
*/
@Test
public void testServiceTopologyInitialized() throws Exception {
IgniteEx loc = startGrids(NODES_CNT);
IgniteEx rmt = grid(NODES_CNT - 1);
assertEquals(1, fullMsgUnblockedLatch.getCount());
fullMsgUnblockedLatch.countDown();
assertEquals(1, fullMsgHandledLatch.getCount());
deployServices(loc);
assertTrue(fullMsgHandledLatch.await(getTestTimeout(), MILLISECONDS));
assertThrowsWithCause(
() -> rmt.services().serviceProxy(BROKEN_SRVC, Invoker.class, false).invoke(),
IgniteException.class);
assertTrue(rmt.services().serviceProxy(DECENT_SRVC, Invoker.class, false).invoke());
}
/**
* Tests service invocation before its topology is initialized on the local node.
*
* @throws Exception If fails.
*/
@Test
@SuppressWarnings("Convert2MethodRef")
public void testServiceTopologyInitializationDelayed() throws Exception {
IgniteEx loc = startGrids(NODES_CNT);
IgniteEx rmt = grid(NODES_CNT - 1);
assertEquals(1, fullMsgReceivedLatch.getCount());
deployServices(loc);
assertTrue(fullMsgReceivedLatch.await(getTestTimeout(), MILLISECONDS));
IgniteInternalFuture<Boolean> decentSvcFut = runAsync(() ->
rmt.services().serviceProxy(DECENT_SRVC, Invoker.class, false).invoke());
IgniteInternalFuture<Boolean> brokenSvcFut = runAsync(() ->
rmt.services().serviceProxy(BROKEN_SRVC, Invoker.class, false).invoke());
U.sleep(500);
assertEquals(1, fullMsgUnblockedLatch.getCount());
assertFalse(decentSvcFut.isDone());
assertFalse(brokenSvcFut.isDone());
fullMsgUnblockedLatch.countDown();
assertTrue(decentSvcFut.get(getTestTimeout()));
assertThrowsWithCause(() -> brokenSvcFut.get(getTestTimeout()), IgniteException.class);
}
/**
* Deploys two services. The deployment of one of which fails on all nodes and the other proceeds smoothly.
*
* @param ignite Deployment initiator.
*/
private void deployServices(Ignite ignite) {
assertThrowsWithCause(
() -> ignite.services().deployAll(Arrays.asList(
getServiceConfiguration(BROKEN_SRVC, new TestService(true)),
getServiceConfiguration(DECENT_SRVC, new TestService(false)))),
ServiceDeploymentException.class);
}
/**
* @param name Name of the service.
* @param srvc Instance of the service.
* @return Service configuration.
*/
private ServiceConfiguration getServiceConfiguration(String name, Service srvc) {
return new ServiceConfiguration()
.setName(name)
.setMaxPerNodeCount(1)
.setNodeFilter(node -> node.attribute(ATTR_SKIP_DEPLOYMENT) == null)
.setService(srvc);
}
/**
* Service that can be forced to fail during initialization.
*/
public static class TestService implements Invoker {
/** Wheter exception is thrown during initialisation. */
private final boolean initBroken;
/**
* @param initBroken Wheter exception is thrown during initialisation.
*/
public TestService(boolean initBroken) {
this.initBroken = initBroken;
}
/** {@inheritDoc} */
@Override public void cancel(ServiceContext ctx) {
// No-op.
}
/** {@inheritDoc} */
@Override public void init(ServiceContext ctx) throws Exception {
if (initBroken)
throw new RuntimeException("Expected service initialization failure.");
}
/** {@inheritDoc} */
@Override public void execute(ServiceContext ctx) throws Exception {
// No-op.
}
/** {@inheritDoc} */
@Override public boolean invoke() {
return true;
}
}
/** */
public interface Invoker extends Service {
/** */
public boolean invoke();
}
}