blob: 832b982e870f510d224abcf38619c4011d40312d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.service;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.naming.NoPermissionException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteServices;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.resources.ServiceContextResource;
import org.apache.ignite.resources.ServiceResource;
import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceCallContext;
import org.apache.ignite.services.ServiceCallInterceptor;
import org.apache.ignite.services.ServiceConfiguration;
import org.apache.ignite.services.ServiceContext;
import org.apache.ignite.services.ServiceDeploymentException;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/**
* Tests service call interceptor.
*/
@RunWith(Parameterized.class)
public class IgniteServiceCallInterceptorTest extends GridCommonAbstractTest implements Serializable {
/** */
private static final long serialVersionUID = 0L;
/** String attribute name. */
private static final String STR_ATTR_NAME = "str.attr";
/** Injected service name. */
private static final String SVC_NAME_INJECTED = "test-svc-injected";
/** Intercepted service name. */
private static final String SVC_NAME_INTERCEPTED = "test-svc-intercepted";
/** Nodes count. */
private static final int NODES_CNT = 3;
/** Max number of services per node. */
private static final int SVC_PER_NODE = 2;
/** Interceptor call counter. */
private static final AtomicInteger interceptorCallCntr = new AtomicInteger();
/** Service method call counter. */
private static final AtomicInteger svcCallCntr = new AtomicInteger();
/** Ignite client node. */
private static Ignite client;
/** Flag to deploy single service instance per cluster. */
@Parameterized.Parameter(0)
public boolean clusterSingleton;
/** Whether Ignite should always contact the same remote service instance. */
@Parameterized.Parameter(1)
public boolean sticky;
/** {@inheritDoc} */
@Override public void beforeTestsStarted() throws Exception {
startGrids(NODES_CNT - 1);
client = startClientGrid(NODES_CNT - 1);
}
/** */
@Parameterized.Parameters(name = "clusterSingleton={0}, sticky={1}")
public static Collection<?> parameters() {
return Arrays.asList(new Object[][] {
{false, false},
{false, true},
{true, true},
});
}
/** {@inheritDoc} */
@Override public void afterTest() {
grid(0).services().cancelAll();
}
/**
* @param name Service name.
* @param svc Service.
* @param singleton Flag to deploy single service instance per cluster.
* @param interceptors Interceptors.
* @return Service configuration.
*/
private ServiceConfiguration serviceCfg(String name, Service svc, boolean singleton, ServiceCallInterceptor... interceptors) {
return new ServiceConfiguration()
.setName(name)
.setTotalCount(singleton ? 1 : NODES_CNT * SVC_PER_NODE)
.setMaxPerNodeCount(SVC_PER_NODE)
.setService(svc)
.setInterceptors(interceptors);
}
/**
* Validates configuration comparison during service deployment.
*/
@Test
public void testRedeploy() {
ServiceCallInterceptor interceptor = (mtd, args, ctx, next) -> "1";
ServiceConfiguration cfg = serviceCfg(SVC_NAME_INTERCEPTED, new TestServiceImpl(), clusterSingleton, interceptor);
client.services().deploy(cfg);
TestService proxy = client.services().serviceProxy(SVC_NAME_INTERCEPTED, TestService.class, false);
assertEquals("1", proxy.method(0));
// Redeploy with the same configuration.
cfg.setInterceptors(interceptor);
client.services().deploy(cfg);
// Redeploy with different configuration.
cfg.setInterceptors((mtd, args, ctx, next) -> "2");
GridTestUtils.assertThrowsAnyCause(log, () -> {
client.services().deploy(cfg);
return null;
}, ServiceDeploymentException.class, null);
// Redeploy the service with a valid configuration.
client.services().cancel(SVC_NAME_INTERCEPTED);
client.services().deploy(cfg);
assertEquals("2", proxy.method(0));
// Undeploy service.
client.services().cancel(SVC_NAME_INTERCEPTED);
// Deploy with an interceptor that will not be able to deserialize.
cfg.setInterceptors(new CriticalErrorInterceptor());
GridTestUtils.assertThrowsAnyCause(log, () -> {
client.services().deploy(cfg);
return null;
}, ServiceDeploymentException.class, null);
for (int i = 0; i < NODES_CNT - 1; i++)
assertNull("idx=" + i, grid(i).services().service(SVC_NAME_INTERCEPTED));
}
/**
* Checks if an interceptor exception is being passed to the user.
*/
@Test
public void testException() {
interceptorCallCntr.set(0);
svcCallCntr.set(0);
Exception expE = new NoPermissionException("Request is forbidden.");
String ctxVal = "42";
ServiceCallInterceptor security = (mtd, args, ctx, next) -> {
ServiceCallContext callCtx = ctx.currentCallContext();
if (callCtx == null || !ctxVal.equals(callCtx.attribute(STR_ATTR_NAME)))
throw expE;
return next.call();
};
ServiceCallInterceptor second = (mtd, args, ctx, next) -> {
interceptorCallCntr.incrementAndGet();
return next.call();
};
client.services().deployAll(Collections.singletonList(
serviceCfg(SVC_NAME_INTERCEPTED, new TestServiceImpl(), clusterSingleton, security, second)
));
TestService noCtxProxy = client.services().serviceProxy(SVC_NAME_INTERCEPTED, TestService.class, sticky);
GridTestUtils.assertThrowsAnyCause(null, () -> noCtxProxy.method(0), expE.getClass(), expE.getMessage());
assertEquals(0, interceptorCallCntr.get());
assertEquals(0, svcCallCntr.get());
ServiceCallContext callCtx = ServiceCallContext.builder().put(STR_ATTR_NAME, "42").build();
String res = client.services().serviceProxy(SVC_NAME_INTERCEPTED, TestService.class, sticky, callCtx).method(0);
assertEquals("cls=" + TestServiceImpl.class.getSimpleName() + ", ctxVal=" + ctxVal + ", arg=0", res);
assertEquals(1, interceptorCallCntr.get());
assertEquals(1, svcCallCntr.get());
}
/**
* Ensures that the interceptor can completely override the call and prevent the service method invocation.
*/
@Test
public void testBasicInterception() {
svcCallCntr.set(0);
String expMsg = "intercepted";
client.services().deployAll(Collections.singletonList(
serviceCfg(SVC_NAME_INTERCEPTED, new TestServiceImpl(), clusterSingleton, (mtd, args, ctx, next) -> expMsg)
));
TestService svcProxy = client.services().serviceProxy(SVC_NAME_INTERCEPTED, TestService.class, sticky);
assertEquals(expMsg, svcProxy.method(1));
assertEquals(0, svcCallCntr.get());
}
/**
* Checks the call order and parameters of the interceptor in multithreaded mode.
*
* @throws Exception If failed.
*/
@Test
public void testOrder() throws Exception {
int interceptorsCnt = 8;
int threadCnt = 16;
ServiceCallInterceptor[] interceptors = new ServiceCallInterceptor[interceptorsCnt];
for (int i = 0; i < interceptorsCnt; i++)
interceptors[interceptorsCnt - i - 1] = new SvcInterceptor(null, i);
client.services().deployAll(Arrays.asList(
serviceCfg(SVC_NAME_INTERCEPTED, new TestServiceImpl(), clusterSingleton, interceptors),
serviceCfg(SVC_NAME_INJECTED, new TestServiceInjected(), !clusterSingleton, interceptors)
));
ServiceCallContext callCtx1 = ServiceCallContext.builder().put(STR_ATTR_NAME, "ctxVal1").build();
ServiceCallContext callCtx2 = ServiceCallContext.builder().put(STR_ATTR_NAME, "ctxVal2").build();
AtomicInteger cntr = new AtomicInteger();
CyclicBarrier barrier = new CyclicBarrier(threadCnt);
GridTestUtils.runMultiThreaded(() -> {
int arg = cntr.incrementAndGet();
IgniteServices services = grid(arg % NODES_CNT).services();
TestService ctx1Proxy = services.serviceProxy(SVC_NAME_INTERCEPTED, TestService.class, sticky, callCtx1);
TestService ctx2Proxy = services.serviceProxy(SVC_NAME_INTERCEPTED, TestService.class, sticky, callCtx2);
String exoCtx1 = fmtExpResult(TestServiceImpl.class, callCtx1, interceptors.length, "method", null, arg);
String exoCtx2 = fmtExpResult(TestServiceImpl.class, callCtx2, interceptors.length, "method", null, arg);
String exoCtx1inj = fmtExpResult(TestServiceInjected.class, callCtx1, interceptors.length, "method", "injected", arg);
String exoCtx2inj = fmtExpResult(TestServiceInjected.class, callCtx2, interceptors.length, "method", "injected", arg);
barrier.await(getTestTimeout(), TimeUnit.MILLISECONDS);
for (int i = 0; i < 100; i++) {
assertEquals(exoCtx1inj, ctx1Proxy.injected(arg));
assertEquals(exoCtx2inj, ctx2Proxy.injected(arg));
assertEquals(exoCtx1, ctx1Proxy.method(arg));
assertEquals(exoCtx2, ctx2Proxy.method(arg));
}
return null;
}, threadCnt, "worker");
}
/**
* @param cls Service implementation class name.
* @param ctx Service call context.
* @param incpsCnt Count of interceptors.
* @param mtd1 First method that was called.
* @param mtd2 Second method that was called (if one service calls another).
* @param arg Method argument.
* @return Expected result of the method call.
*/
private String fmtExpResult(Class<?> cls, ServiceCallContext ctx, int incpsCnt, String mtd1, String mtd2, int arg) {
SB buf = new SB("cls=" + cls.getSimpleName() + ", ctxVal=" + ctx.attribute(STR_ATTR_NAME) + ", arg=" + arg);
for (String method : new String[] {mtd1, mtd2}) {
if (method == null)
break;
for (int i = 0; i < incpsCnt; i++)
buf.a("; mtd=").a(method).a(", ctxVal=").a(ctx.attribute(STR_ATTR_NAME)).a(", arg=").a(arg).a(", id=").a(i);
}
return buf.toString();
}
/** */
private interface TestService extends Service {
/** */
String method(int arg);
/** */
String injected(int arg);
}
/** */
private static class TestServiceImpl implements TestService {
/** Injected service. */
@ServiceResource(serviceName = SVC_NAME_INJECTED, proxyInterface = TestService.class, forwardCallerContext = true)
private TestService injected;
/** Service context. */
@ServiceContextResource
private ServiceContext ctx;
/** {@inheritDoc} */
@Override public String method(int arg) {
svcCallCntr.incrementAndGet();
ServiceCallContext callCtx = ctx.currentCallContext();
String attrVal = callCtx == null ? null : ctx.currentCallContext().attribute(STR_ATTR_NAME);
return "cls=" + getClass().getSimpleName() + ", ctxVal=" + attrVal + ", arg=" + arg;
}
/** {@inheritDoc} */
@Override public String injected(int arg) {
assert injected != null;
return injected.method(arg);
}
}
/** */
private static class TestServiceInjected extends TestServiceImpl {
/** */
private static final long serialVersionUID = 0L;
}
/** Service call interceptor. */
static class SvcInterceptor implements ServiceCallInterceptor {
/** */
private static final long serialVersionUID = 0L;
/** Ignite. */
@IgniteInstanceResource
private Ignite ignite;
/** Logger. */
@LoggerResource
private IgniteLogger log;
/** Expected method name. */
private final String expMtdName;
/** Interceptor ID. */
private final int id;
/**
* @param expMtdName Expected method name.
* @param id Interceptor ID.
*/
public SvcInterceptor(String expMtdName, int id) {
this.expMtdName = expMtdName;
this.id = id;
}
/** {@inheritDoc} */
@Override public Object invoke(String mtd, Object[] args, ServiceContext ctx, Callable<Object> next) throws Exception {
assert ignite != null;
assert log != null;
if (expMtdName != null)
assertEquals(expMtdName, mtd);
Object arg = args[0];
ServiceCallContext callCtx = ctx.currentCallContext();
assert callCtx != null;
Object res = next.call();
callCtx = ctx.currentCallContext();
assert callCtx != null : res;
String ctxVal = callCtx == null ? null : ctx.currentCallContext().attribute(STR_ATTR_NAME);
assert ctxVal != null;
return res + "; mtd=" + mtd + ", ctxVal=" + ctxVal + ", arg=" + arg + ", id=" + id;
}
}
/** */
private static class CriticalErrorInterceptor implements ServiceCallInterceptor, Externalizable {
/** Serial version uid. */
private static final long serialVersionUID = 0L;
/** */
public CriticalErrorInterceptor() {
// No-op.
}
/** {@inheritDoc} */
@Override public Object invoke(String mtd, Object[] args, ServiceContext ctx,
Callable<Object> next) throws Exception {
return next.call();
}
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
// No-op.
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
throw new AssertionError("Expected critical error.");
}
}
}