blob: c06933c1928236bce067b122904584abab76cc0c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.client.integration;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.Cache;
import javax.cache.configuration.Factory;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cache.store.CacheStoreAdapter;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobAdapter;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeTaskSplitAdapter;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.ConnectorConfiguration;
import org.apache.ignite.configuration.ConnectorMessageInterceptor;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.client.GridClient;
import org.apache.ignite.internal.client.GridClientClosedException;
import org.apache.ignite.internal.client.GridClientCompute;
import org.apache.ignite.internal.client.GridClientConfiguration;
import org.apache.ignite.internal.client.GridClientData;
import org.apache.ignite.internal.client.GridClientDataConfiguration;
import org.apache.ignite.internal.client.GridClientException;
import org.apache.ignite.internal.client.GridClientFactory;
import org.apache.ignite.internal.client.GridClientFuture;
import org.apache.ignite.internal.client.GridClientNode;
import org.apache.ignite.internal.client.GridClientPredicate;
import org.apache.ignite.internal.client.GridClientProtocol;
import org.apache.ignite.internal.client.GridServerUnreachableException;
import org.apache.ignite.internal.client.ssl.GridSslContextFactory;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.Assert;
import org.junit.Test;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_JETTY_PORT;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
/**
* Tests for Java client.
*/
@SuppressWarnings("deprecation")
public abstract class ClientAbstractSelfTest extends GridCommonAbstractTest {
/** */
private static final String REPLICATED_CACHE_NAME = "replicated";
/** */
private static final String PARTITIONED_CACHE_NAME = "partitioned";
/** */
public static final String HOST = "127.0.0.1";
/** */
public static final int JETTY_PORT = 8080;
/** */
public static final int BINARY_PORT = 11212;
/** Path to jetty config. */
public static final String REST_JETTY_CFG = "modules/clients/src/test/resources/jetty/rest-jetty.xml";
/** Need to be static because configuration inits only once per class. */
private static final ConcurrentMap<Object, Object> INTERCEPTED_OBJECTS = new ConcurrentHashMap<>();
/** */
private static final Map<String, HashMapStore> cacheStores = new HashMap<>();
/** */
public static final String ROUTER_LOG_CFG = "modules/core/src/test/config/log4j2-test.xml";
/** */
private static final String INTERCEPTED_SUF = "intercepted";
/** */
private static final String[] TASK_ARGS = new String[] {"executing", "test", "task"};
/** Flag indicating whether intercepted objects should be overwritten. */
private static volatile boolean overwriteIntercepted;
/** */
private ExecutorService exec;
/** */
protected GridClient client;
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
System.setProperty(IGNITE_JETTY_PORT, Integer.toString(JETTY_PORT));
startGrid();
System.clearProperty(IGNITE_JETTY_PORT);
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
exec = Executors.newCachedThreadPool();
client = client();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
U.shutdownNow(ClientAbstractSelfTest.class, exec, log);
exec = null;
if (client != null)
GridClientFactory.stop(client.id(), true);
client = null;
synchronized (cacheStores) {
for (HashMapStore cacheStore : cacheStores.values())
cacheStore.map.clear();
}
grid().cache(PARTITIONED_CACHE_NAME).clear();
grid().cache(REPLICATED_CACHE_NAME).clear();
INTERCEPTED_OBJECTS.clear();
}
/**
* Gets protocol which should be used in client connection.
*
* @return Protocol.
*/
protected abstract GridClientProtocol protocol();
/**
* Gets server address to which client should connect.
*
* @return Server address in format "host:port".
*/
protected abstract String serverAddress();
/**
* @return Whether SSL should be used in test.
*/
protected abstract boolean useSsl();
/**
* @return SSL context factory used in test.
*/
protected abstract GridSslContextFactory sslContextFactory();
/**
* Get task name.
*
* @return Task name.
*/
protected String getTaskName() {
return TestTask.class.getName();
}
/**
* @return name of the sleep task for current test.
*/
protected String getSleepTaskName() {
return SleepTestTask.class.getName();
}
/**
* Get task argument.
*
* @return Task argument.
*/
protected Object getTaskArgument() {
return Arrays.asList(TASK_ARGS);
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setLocalHost(HOST);
assert cfg.getConnectorConfiguration() == null;
ConnectorConfiguration clientCfg = new ConnectorConfiguration();
clientCfg.setPort(BINARY_PORT);
if (useSsl()) {
clientCfg.setSslEnabled(true);
clientCfg.setSslContextFactory(sslContextFactory());
}
cfg.setConnectorConfiguration(clientCfg);
cfg.setCacheConfiguration(cacheConfiguration(REPLICATED_CACHE_NAME), cacheConfiguration(PARTITIONED_CACHE_NAME));
clientCfg.setMessageInterceptor(new ConnectorMessageInterceptor() {
/** {@inheritDoc} */
@Override public Object onReceive(@Nullable Object obj) {
if (obj != null)
INTERCEPTED_OBJECTS.put(obj, obj);
return overwriteIntercepted && obj instanceof String ?
obj + INTERCEPTED_SUF : obj;
}
/** {@inheritDoc} */
@Override public Object onSend(Object obj) {
if (obj != null)
INTERCEPTED_OBJECTS.put(obj, obj);
return obj;
}
});
return cfg;
}
/**
* @param cacheName Cache name.
* @return Cache configuration.
* @throws Exception In case of error.
*/
@SuppressWarnings("unchecked")
private static CacheConfiguration cacheConfiguration(@NotNull final String cacheName) throws Exception {
CacheConfiguration cfg = defaultCacheConfiguration();
cfg.setCacheMode(REPLICATED_CACHE_NAME.equals(cacheName) ? REPLICATED : PARTITIONED);
cfg.setName(cacheName);
cfg.setCacheStoreFactory(new Factory<CacheStore>() {
@Override public CacheStore create() {
synchronized (cacheStores) {
HashMapStore cacheStore = cacheStores.get(cacheName);
if (cacheStore == null)
cacheStores.put(cacheName, cacheStore = new HashMapStore());
return cacheStore;
}
}
});
cfg.setWriteThrough(true);
cfg.setReadThrough(true);
cfg.setLoadPreviousValue(true);
if (cfg.getCacheMode() == PARTITIONED)
cfg.setBackups(1);
return cfg;
}
/**
* @return Client.
* @throws GridClientException In case of error.
*/
protected GridClient client() throws GridClientException {
return GridClientFactory.start(clientConfiguration());
}
/**
* @return Test client configuration.
*/
protected GridClientConfiguration clientConfiguration() throws GridClientException {
GridClientConfiguration cfg = new GridClientConfiguration();
GridClientDataConfiguration nullCache = new GridClientDataConfiguration();
GridClientDataConfiguration cache = new GridClientDataConfiguration();
cache.setName(PARTITIONED_CACHE_NAME);
cfg.setDataConfigurations(Arrays.asList(nullCache, cache));
cfg.setProtocol(protocol());
cfg.setServers(Collections.singleton(serverAddress()));
// Setting custom executor, to avoid failures on client shutdown.
// And applying custom naming scheme to ease debugging.
cfg.setExecutorService(Executors.newCachedThreadPool(new ThreadFactory() {
private AtomicInteger cntr = new AtomicInteger();
/** {@inheritDoc} */
@Override public Thread newThread(Runnable r) {
return new Thread(r, "client-worker-thread-" + cntr.getAndIncrement());
}
}));
if (useSsl())
cfg.setSslContextFactory(sslContextFactory());
return cfg;
}
/**
* @throws Exception If failed.
*/
@Test
public void testConnectable() throws Exception {
GridClient client = client();
List<GridClientNode> nodes = client.compute().refreshTopology(false, false);
assertTrue(F.first(nodes).connectable());
}
/**
* Check async API methods don't generate exceptions.
*
* @throws Exception If failed.
*/
@Test
public void testNoAsyncExceptions() throws Exception {
GridClient client = client();
GridClientData data = client.data(PARTITIONED_CACHE_NAME);
GridClientCompute compute = client.compute().projection(new GridClientPredicate<GridClientNode>() {
@Override public boolean apply(GridClientNode e) {
return false;
}
});
Map<String, GridClientFuture<?>> futs = new LinkedHashMap<>();
futs.put("exec", compute.executeAsync("taskName", "taskArg"));
futs.put("affExec", compute.affinityExecuteAsync("taskName", "cacheName", "affKey", "taskArg"));
futs.put("refreshById", compute.refreshNodeAsync(UUID.randomUUID(), true, true));
futs.put("refreshByIP", compute.refreshNodeAsync("nodeIP", true, true));
futs.put("refreshTop", compute.refreshTopologyAsync(true, true));
GridClientFactory.stop(client.id(), false);
futs.put("put", data.putAsync("key", "val"));
futs.put("putAll", data.putAllAsync(F.asMap("key", "val")));
futs.put("get", data.getAsync("key"));
futs.put("getAll", data.getAllAsync(Collections.singletonList("key")));
futs.put("remove", data.removeAsync("key"));
futs.put("removeAll", data.removeAllAsync(Collections.singletonList("key")));
futs.put("replace", data.replaceAsync("key", "val"));
futs.put("cas", data.casAsync("key", "val", "val2"));
futs.put("metrics", data.metricsAsync());
for (Map.Entry<String, GridClientFuture<?>> e : futs.entrySet()) {
try {
e.getValue().get();
info("Expects '" + e.getKey() + "' fails with grid client exception.");
}
catch (GridServerUnreachableException | GridClientClosedException ignore) {
// No op: compute projection is empty.
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testGracefulShutdown() throws Exception {
GridClientCompute compute = client.compute();
Object taskArg = getTaskArgument();
String taskName = getSleepTaskName();
GridClientFuture<Object> fut = compute.executeAsync(taskName, taskArg);
GridClientFuture<Object> fut2 = compute.executeAsync(taskName, taskArg);
GridClientFactory.stop(client.id(), true);
Assert.assertEquals(17, fut.get());
Assert.assertEquals(17, fut2.get());
}
/**
* @throws Exception If failed.
*/
@Test
public void testForceShutdown() throws Exception {
GridClientCompute compute = client.compute();
Object taskArg = getTaskArgument();
String taskName = getSleepTaskName();
GridClientFuture<Object> fut = compute.executeAsync(taskName, taskArg);
GridClientFactory.stop(client.id(), false);
try {
fut.get();
}
catch (GridClientClosedException ignored) {
return;
}
Assert.fail("Expected GridClientClosedException.");
}
/**
* @throws Exception If failed.
*/
@Test
public void testShutdown() throws Exception {
GridClient c = client();
GridClientCompute compute = c.compute();
String taskName = getTaskName();
Object taskArg = getTaskArgument();
Collection<GridClientFuture<Object>> futs = new ArrayList<>();
// Validate connection works.
compute.execute(taskName, taskArg);
info(">>> First task executed successfully, running batch.");
for (int i = 0; i < 10; i++)
futs.add(compute.executeAsync(taskName, taskArg));
// Stop client.
GridClientFactory.stop(c.id(), true);
info(">>> Completed stop request.");
int failed = 0;
for (GridClientFuture<Object> fut : futs) {
try {
assertEquals(17, fut.get());
}
catch (GridClientException e) {
failed++;
log.warning("Task execution failed.", e);
}
}
assertEquals(0, failed);
}
/**
* @throws Exception If failed.
*/
@Test
public void testExecute() throws Exception {
String taskName = getTaskName();
Object taskArg = getTaskArgument();
GridClientCompute compute = client.compute();
assertEquals(Integer.valueOf(17), compute.execute(taskName, taskArg));
assertEquals(Integer.valueOf(17), compute.executeAsync(taskName, taskArg).get());
}
/**
* @throws Exception If failed.
*/
@Test
public void testTopology() throws Exception {
GridClientCompute compute = client.compute();
List<GridClientNode> top = compute.refreshTopology(true, true);
assertNotNull(top);
assertEquals(1, top.size());
GridClientNode node = F.first(top);
assertNotNull(node);
assertFalse(node.attributes().isEmpty());
assertNotNull(node.tcpAddresses());
assertEquals(grid().localNode().id(), node.nodeId());
assertNotNull(node.metrics());
top = compute.refreshTopology(false, false);
node = F.first(top);
assertNotNull(top);
assertEquals(1, top.size());
assertNull(node.metrics());
assertTrue(node.attributes().isEmpty());
node = F.first(top);
assertNotNull(node);
assertTrue(node.attributes().isEmpty());
assertNull(node.metrics());
assertNotNull(node.tcpAddresses());
assertEquals(grid().localNode().id(), node.nodeId());
top = compute.refreshTopologyAsync(true, true).get();
assertNotNull(top);
assertEquals(1, top.size());
node = F.first(top);
assertNotNull(node);
assertFalse(node.attributes().isEmpty());
assertNotNull(node.metrics());
assertNotNull(node.tcpAddresses());
assertEquals(grid().localNode().id(), node.nodeId());
top = compute.refreshTopologyAsync(false, false).get();
assertNotNull(top);
assertEquals(1, top.size());
node = F.first(top);
assertNotNull(node);
assertTrue(node.attributes().isEmpty());
assertNull(node.metrics());
assertNotNull(node.tcpAddresses());
assertEquals(grid().localNode().id(), node.nodeId());
}
/**
* Test task.
*/
private static class TestTask extends ComputeTaskSplitAdapter<List<String>, Integer> {
/** {@inheritDoc} */
@Override protected Collection<? extends ComputeJob> split(int gridSize, List<String> list) {
Collection<ComputeJobAdapter> jobs = new ArrayList<>();
if (list != null)
for (final String val : list)
jobs.add(new ComputeJobAdapter() {
@Override public Object execute() {
try {
Thread.sleep(1);
}
catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
return val == null ? 0 : val.length();
}
});
return jobs;
}
/** {@inheritDoc} */
@Override public Integer reduce(List<ComputeJobResult> results) {
int sum = 0;
for (ComputeJobResult res : results)
sum += res.<Integer>getData();
return sum;
}
}
/**
* Test task that sleeps 5 seconds.
*/
private static class SleepTestTask extends ComputeTaskSplitAdapter<List<String>, Integer> {
/** {@inheritDoc} */
@Override protected Collection<? extends ComputeJob> split(int gridSize, List<String> list) {
Collection<ComputeJobAdapter> jobs = new ArrayList<>();
if (list != null)
for (final String val : list)
jobs.add(new ComputeJobAdapter() {
@Override public Object execute() {
try {
Thread.sleep(5000);
return val == null ? 0 : val.length();
}
catch (InterruptedException ignored) {
return -1;
}
}
});
return jobs;
}
/** {@inheritDoc} */
@Override public Integer reduce(List<ComputeJobResult> results) {
int sum = 0;
for (ComputeJobResult res : results)
sum += res.<Integer>getData();
return sum;
}
}
/** JSON to java mapper. */
private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
/**
* Http test task with restriction to string arguments only.
*/
protected static class HttpTestTask extends ComputeTaskSplitAdapter<String, Integer> {
/** */
private final TestTask delegate = new TestTask();
/** {@inheritDoc} */
@Override protected Collection<? extends ComputeJob> split(int gridSize, String arg) {
if (arg.endsWith("intercepted"))
arg = arg.substring(0, arg.length() - 11);
try {
JsonNode json = JSON_MAPPER.readTree(arg);
List<String> list = null;
if (json.isArray()) {
list = new ArrayList<>();
for (JsonNode child : json)
list.add(child.asText());
}
return delegate.split(gridSize, list);
}
catch (IOException e) {
throw new IgniteException(e);
}
}
/** {@inheritDoc} */
@Override public Integer reduce(List<ComputeJobResult> results) {
return delegate.reduce(results);
}
}
/**
* Http wrapper for sleep task.
*/
protected static class SleepHttpTestTask extends ComputeTaskSplitAdapter<String, Integer> {
/** */
private final SleepTestTask delegate = new SleepTestTask();
/** {@inheritDoc} */
@Override protected Collection<? extends ComputeJob> split(int gridSize, String arg) {
try {
JsonNode json = JSON_MAPPER.readTree(arg);
List<String> list = null;
if (json.isArray()) {
list = new ArrayList<>();
for (JsonNode child : json)
list.add(child.asText());
}
return delegate.split(gridSize, list);
}
catch (IOException e) {
throw new IgniteException(e);
}
}
/** {@inheritDoc} */
@Override public Integer reduce(List<ComputeJobResult> results) {
return delegate.reduce(results);
}
}
/**
* Simple HashMap based cache store emulation.
*/
private static class HashMapStore extends CacheStoreAdapter<Object, Object> {
/** Map for cache store. */
private final Map<Object, Object> map = new HashMap<>();
/** {@inheritDoc} */
@Override public void loadCache(IgniteBiInClosure<Object, Object> clo, Object... args) {
for (Map.Entry e : map.entrySet())
clo.apply(e.getKey(), e.getValue());
}
/** {@inheritDoc} */
@Override public Object load(Object key) {
return map.get(key);
}
/** {@inheritDoc} */
@Override public void write(Cache.Entry<?, ?> e) {
map.put(e.getKey(), e.getValue());
}
/** {@inheritDoc} */
@Override public void delete(Object key) {
map.remove(key);
}
}
}