blob: 1a2d0f5657dcded30de8e313c0ae320bcd38404f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.service;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.resources.ServiceContextResource;
import org.apache.ignite.resources.ServiceResource;
import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceCallContext;
import org.apache.ignite.services.ServiceConfiguration;
import org.apache.ignite.services.ServiceContext;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/**
* Tests service caller context.
*/
@RunWith(Parameterized.class)
public class IgniteServiceCallContextTest extends GridCommonAbstractTest {
/** String attribute name. */
private static final String STR_ATTR_NAME = "str.attr";
/** Binary attribute name. */
private static final String BIN_ATTR_NAME = "bin.attr";
/** Service name. */
private static final String SVC_NAME = "test-svc";
/** Injected service name. */
private static final String SVC_NAME_INJECTED = "test-svc-injected";
/** Nodes count. */
private static final int NODES_CNT = 3;
/** Max number of services per node. */
private static final int SVC_PER_NODE = 2;
/** Flag to deploy single service instance per cluster. */
@Parameterized.Parameter(0)
public boolean clusterSingleton;
/** Whether or not Ignite should always contact the same remote service instance. */
@Parameterized.Parameter(1)
public boolean sticky;
/** {@inheritDoc} */
@Override public void beforeTestsStarted() throws Exception {
startGrids(NODES_CNT - 1);
startClientGrid(NODES_CNT - 1);
}
/** */
@Parameterized.Parameters(name = "clusterSingleton={0}, sticky={1}")
public static Collection<?> parameters() {
return Arrays.asList(new Object[][] {
{false, false},
{false, true},
{true, true},
});
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName).setServiceConfiguration(
serviceCfg(SVC_NAME_INJECTED, true),
serviceCfg(SVC_NAME, clusterSingleton)
);
}
/**
* @param name Service name.
* @param clusterSingleton Flag to deploy single service instance per cluster.
* @return Service configuration.
*/
private ServiceConfiguration serviceCfg(String name, boolean clusterSingleton) {
return new ServiceConfiguration()
.setName(name)
.setTotalCount(clusterSingleton ? 1 : NODES_CNT * SVC_PER_NODE)
.setMaxPerNodeCount(SVC_PER_NODE)
.setService(new TestServiceImpl());
}
/**
* Check proxy creation with an invalid implementation.
*/
@Test
public void testInvalidContextImplementation() {
ServiceCallContext callCtx = new ServiceCallContext() {
@Override public String attribute(String name) {
return null;
}
@Override public byte[] binaryAttribute(String name) {
return null;
}
};
GridTestUtils.assertThrowsAnyCause(log, () -> grid(0).services().serviceProxy(SVC_NAME, TestService.class,
sticky, callCtx), IllegalArgumentException.class, "\"callCtx\" has an invalid type.");
}
/**
* Check context attribute.
*/
@Test
public void testContextAttribute() {
for (int i = 0; i < NODES_CNT; i++) {
String strVal = String.valueOf(i);
byte[] binVal = strVal.getBytes();
TestService proxy = createProxyWithContext(grid(i), strVal, binVal);
assertEquals(strVal, proxy.attribute(false));
assertEquals(strVal, proxy.attribute(true));
assertTrue(Arrays.equals(binVal, proxy.binaryAttribute(false)));
assertTrue(Arrays.equals(binVal, proxy.binaryAttribute(true)));
}
}
/**
* Check context attribute concurrently.
*
* @throws Exception If failed.
*/
@Test
public void testContextAttributeMultithreaded() throws Exception {
Map<TestService, T2<String, byte[]>> proxies = new HashMap<>();
for (int i = 0; i < G.allGrids().size(); i++) {
String strVal1 = String.valueOf(i * 2);
String strVal2 = String.valueOf(i * 2 + 1);
byte[] binVal1 = strVal1.getBytes();
byte[] binVal2 = strVal2.getBytes();
proxies.put(createProxyWithContext(grid(i), strVal1, binVal1), new T2<>(strVal1, binVal1));
proxies.put(createProxyWithContext(grid(i), strVal2, binVal2), new T2<>(strVal2, binVal2));
}
int threadsPerProxy = 2;
CyclicBarrier barrier = new CyclicBarrier(proxies.size() * threadsPerProxy);
GridCompoundFuture<Long, Long> compFut = new GridCompoundFuture<>();
for (Map.Entry<TestService, T2<String, byte[]>> e : proxies.entrySet()) {
IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(() -> {
barrier.await(getTestTimeout(), TimeUnit.MILLISECONDS);
for (int i = 0; i < 100; i++) {
T2<String, byte[]> expVals = e.getValue();
TestService proxy = e.getKey();
assertEquals(expVals.get1(), proxy.attribute(false));
assertEquals(expVals.get1(), proxy.attribute(true));
assertTrue(Arrays.equals(expVals.get2(), proxy.binaryAttribute(false)));
assertTrue(Arrays.equals(expVals.get2(), proxy.binaryAttribute(true)));
}
return true;
}, threadsPerProxy, "worker");
compFut.add(fut);
}
compFut.markInitialized();
compFut.get(getTestTimeout());
}
/**
* @param node Ignite node.
* @param attrVal String attribute value.
* @param binVal Binary attribute value.
* @return Service proxy instance.
*/
private TestService createProxyWithContext(Ignite node, String attrVal, byte[] binVal) {
ServiceCallContext callCtx = ServiceCallContext.builder()
.put(STR_ATTR_NAME, attrVal)
.put(BIN_ATTR_NAME, binVal)
.build();
return node.services().serviceProxy(SVC_NAME, TestService.class, sticky, callCtx);
}
/** */
private static interface TestService extends Service {
/**
* @param useInjectedSvc Get attribute from the injected service.
* @return Context attribute value.
*/
public String attribute(boolean useInjectedSvc);
/**
* @param useInjectedSvc Get attribute from the injected service.
* @return Context attribute value.
*/
public byte[] binaryAttribute(boolean useInjectedSvc);
}
/** */
private static class TestServiceImpl implements TestService {
/** Injected service. */
@ServiceResource(serviceName = SVC_NAME_INJECTED, proxyInterface = TestService.class, forwardCallerContext = true)
private TestService injected;
/** Service context. */
@ServiceContextResource
private ServiceContext ctx;
/** {@inheritDoc} */
@Override public String attribute(boolean useInjectedSvc) {
assert injected != null;
ServiceCallContext callCtx = ctx.currentCallContext();
return useInjectedSvc ? injected.attribute(false) : callCtx.attribute(STR_ATTR_NAME);
}
/** {@inheritDoc} */
@Override public byte[] binaryAttribute(boolean useInjectedSvc) {
assert injected != null;
ServiceCallContext callCtx = ctx.currentCallContext();
return useInjectedSvc ? injected.binaryAttribute(false) : callCtx.binaryAttribute(BIN_ATTR_NAME);
}
}
}