blob: b86cf326af40c3fc210b2d48bd9a8cbd49dcd8aa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.client;
import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.cache.Cache;
import org.apache.ignite.Ignite;
import org.apache.ignite.Ignition;
import org.apache.ignite.binary.BinaryTypeConfiguration;
import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.configuration.BinaryConfiguration;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
* Load, capacity and performance tests.
*/
public class LoadTest {
/** Per test timeout */
@Rule
public Timeout globalTimeout = new Timeout((int)GridTestUtils.DFLT_TEST_TIMEOUT);
/**
* Test thin client in multi-thread environment.
*/
@Test
public void testMultithreading() throws Exception {
final int THREAD_CNT = 8;
final int ITERATION_CNT = 20;
final int BATCH_SIZE = 1000;
final int PAGE_CNT = 3;
IgniteConfiguration srvCfg = Config.getServerConfiguration();
// No peer class loading from thin clients: we need the server to know about this class to deserialize
// ScanQuery filter.
srvCfg.setBinaryConfiguration(new BinaryConfiguration().setTypeConfigurations(Arrays.asList(
new BinaryTypeConfiguration(getClass().getName()),
new BinaryTypeConfiguration(SerializedLambda.class.getName())
)));
try (Ignite ignored = Ignition.start(srvCfg);
IgniteClient client = Ignition.startClient(new ClientConfiguration().setAddresses(Config.SERVER))
) {
ClientCache<Integer, String> cache = client.createCache("testMultithreading");
AtomicInteger cnt = new AtomicInteger(1);
AtomicReference<Throwable> error = new AtomicReference<>();
Runnable assertion = () -> {
try {
int rangeStart = cnt.getAndAdd(BATCH_SIZE);
int rangeEnd = rangeStart + BATCH_SIZE;
Map<Integer, String> data = IntStream.range(rangeStart, rangeEnd).boxed()
.collect(Collectors.toMap(i -> i, i -> String.format("String %s", i)));
cache.putAll(data);
Query<Cache.Entry<Integer, String>> qry = new ScanQuery<Integer, String>()
.setPageSize(data.size() / PAGE_CNT)
.setFilter((i, s) -> i >= rangeStart && i < rangeEnd);
try (QueryCursor<Cache.Entry<Integer, String>> cur = cache.query(qry)) {
List<Cache.Entry<Integer, String>> res = cur.getAll();
assertEquals("Unexpected number of entries", data.size(), res.size());
Map<Integer, String> act = res.stream()
.collect(Collectors.toMap(Cache.Entry::getKey, Cache.Entry::getValue));
assertEquals("Unexpected entries", data, act);
}
}
catch (Throwable ex) {
error.set(ex);
}
};
CountDownLatch complete = new CountDownLatch(THREAD_CNT);
Runnable manyAssertions = () -> {
for (int i = 0; i < ITERATION_CNT && error.get() == null; i++)
assertion.run();
complete.countDown();
};
ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_CNT);
IntStream.range(0, THREAD_CNT).forEach(t -> threadPool.submit(manyAssertions));
assertTrue("Timeout", complete.await(180, TimeUnit.SECONDS));
String errMsg = error.get() == null ? "" : error.get().getMessage();
assertNull(errMsg, error.get());
}
}
}