blob: ee5560839d3f2b78bb609da28a91c94156973797 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.datastructures;
import java.util.ArrayList;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteQueue;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CollectionConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
/**
* Queue multi node tests.
*/
public abstract class GridCacheQueueMultiNodeAbstractSelfTest extends IgniteCollectionAbstractTest {
/** */
private static final int GRID_CNT = 4;
/** */
private static final int RETRIES = 20;
/** */
protected static final int QUEUE_CAPACITY = 100_000;
/** */
private static AtomicInteger cntr = new AtomicInteger();
/** */
private static final int ITEMS_CNT = 50;
/** {@inheritDoc} */
@Override protected int gridCount() {
return GRID_CNT;
}
/** {@inheritDoc} */
@SuppressWarnings("BusyWait")
@Override protected void beforeTest() throws Exception {
super.beforeTest();
boolean success = false;
for (int i = 0; i < 5; i++) {
success = true;
for (Ignite g : G.allGrids()) {
if (g.cluster().nodes().size() != GRID_CNT) {
info("Grid has incorrect nodes count [igniteInstanceName=" + g.name() +
", nodesCnt=" + g.cluster().nodes().size() + ']');
success = false;
break;
}
}
if (!success && i != 4)
Thread.sleep(1000);
}
assert success;
info("Topology is consistent.");
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setPublicThreadPoolSize(RETRIES * 2);
cfg.setSystemThreadPoolSize(RETRIES * 2);
cfg.setConnectorConfiguration(null);
return cfg;
}
/**
* @throws Exception If failed.
*/
@Test
public void testPut() throws Exception {
String queueName = UUID.randomUUID().toString();
IgniteQueue<Integer> queue = grid(0).queue(queueName, QUEUE_CAPACITY, config(false));
assertTrue(queue.isEmpty());
grid(0).compute().broadcast(new PutJob(queueName, RETRIES));
assertEquals(GRID_CNT * RETRIES, queue.size());
}
/**
* @throws Exception If failed.
*/
@Test
public void testPutPollCollocated() throws Exception {
try {
final String queueName = UUID.randomUUID().toString();
info("Queue name: " + queueName);
grid(0).queue(queueName, 5, config(true));
final CountDownLatch latch = new CountDownLatch(1);
final Ignite g = startGrid(GRID_CNT + 1);
IgniteInternalFuture<Object> fut1 = GridTestUtils.runAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
info(">>> Executing put callable [node=" + g.cluster().localNode().id() +
", thread=" + Thread.currentThread().getName() + ']');
IgniteQueue<Integer> q = g.queue(queueName, 5, config(true));
assertTrue(q.isEmpty());
for (int i = 0; i < ITEMS_CNT; i++) {
if (i == q.capacity()) {
info(">>> Opening latch...");
latch.countDown();
}
put(q, i);
}
info(">>> Finished put callable on node: " + g.cluster().localNode().id());
return null;
}
});
latch.await();
final Ignite g1 = startGrid(GRID_CNT + 2);
IgniteInternalFuture<Object> fut2 = GridTestUtils.runAsync(new Callable<Object>() {
@SuppressWarnings("BusyWait")
@Override public Object call() throws Exception {
try {
info(">>> Executing poll callable [node=" + g1.cluster().localNode().id() +
", thread=" + Thread.currentThread().getName() + ']');
IgniteQueue<Integer> q = g1.queue(queueName, 5, config(true));
int cnt = 0;
int nullCnt = 0;
do {
Integer i = q.poll();
info("Polled value: " + i);
if (i != null) {
cnt++;
nullCnt = 0;
}
else {
if (nullCnt == 3)
throw new Exception("Failed to poll non-null value within 3 attempts.");
nullCnt++;
Thread.sleep(1000);
}
}
while (cnt < ITEMS_CNT);
info("Finished poll callable on node: " + g1.cluster().localNode().id());
return null;
}
finally {
info("Poll callable finished.");
}
}
});
fut1.get();
fut2.get();
grid(0).queue(queueName, 0, null).close();
}
finally {
stopGrid(GRID_CNT + 1);
stopGrid(GRID_CNT + 2);
checkTopology(GRID_CNT);
awaitPartitionMapExchange();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testAddAll() throws Exception {
try {
String queueName = UUID.randomUUID().toString();
info("Queue name: " + queueName);
IgniteQueue<Integer> queue = grid(0).queue(queueName, QUEUE_CAPACITY, config(false));
assertTrue(queue.isEmpty());
grid(0).compute().call(new AddAllJob(queueName, RETRIES));
assertEquals(GRID_CNT * RETRIES, queue.size());
queue.clear(5);
assertEquals(0, queue.size());
queue.close();
}
catch (Throwable t) {
error("Failure in test: " + t);
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testGet() throws Exception {
String queueName = UUID.randomUUID().toString();
info("Queue name: " + queueName);
IgniteQueue<String> queue = grid(0).queue(queueName, QUEUE_CAPACITY, config(false));
assertTrue(queue.isEmpty());
String val = UUID.randomUUID().toString();
queue.put(val);
grid(0).compute().call(new GetJob(queueName, config(false), RETRIES, val));
assertEquals(1, queue.size());
queue.close();
}
/**
* @throws Exception If failed.
*/
@Test
public void testPutTake() throws Exception {
String queueName = UUID.randomUUID().toString();
info("Queue name: " + queueName);
IgniteQueue<Integer> queue = grid(0).queue(queueName, QUEUE_CAPACITY, config(false));
assertTrue(queue.isEmpty());
grid(0).compute().broadcast(new PutTakeJob(queueName, RETRIES));
assertEquals(0, queue.size());
queue.close();
}
/**
* @throws Exception If failed.
*/
@Test
public void testAddMultinode() throws Exception {
testAddMultinode(true);
testAddMultinode(false);
}
/**
* @param collocated Collocation flag.
* @throws Exception If failed.
*/
private void testAddMultinode(final boolean collocated) throws Exception {
final String queueName = UUID.randomUUID().toString();
info("Queue name: " + queueName + ", collocated: " + collocated);
try {
Collection<IgniteInternalFuture> futs = new ArrayList<>();
final int THREADS_PER_NODE = 3;
final int ITEMS_PER_THREAD = 1000;
for (int i = 0; i < GRID_CNT; i++) {
final int idx = i;
futs.add(GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
CollectionConfiguration colCfg = config(collocated);
IgniteQueue<Integer> queue = grid(idx).queue(queueName, 0, colCfg);
for (int i = 0; i < ITEMS_PER_THREAD; i++)
assertTrue(queue.add(i));
return null;
}
}, THREADS_PER_NODE, "testPutMultiNode"));
}
for (IgniteInternalFuture fut : futs)
fut.get();
IgniteQueue<Integer> queue = grid(0).queue(queueName, 0, null);
assertEquals(THREADS_PER_NODE * ITEMS_PER_THREAD * GRID_CNT, queue.size());
int[] items = new int[ITEMS_PER_THREAD];
Integer item;
while ((item = queue.poll()) != null)
items[item]++;
for (int i = 0; i < ITEMS_PER_THREAD; i++)
assertEquals(THREADS_PER_NODE * GRID_CNT, items[i]);
}
finally {
grid(0).queue(queueName, 0, null).close();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testAddPollMultinode() throws Exception {
testAddPollMultinode(true);
testAddPollMultinode(false);
}
/**
* @param collocated Collocation flag.
* @throws Exception If failed.
*/
private void testAddPollMultinode(final boolean collocated) throws Exception {
final String queueName = UUID.randomUUID().toString();
info("Queue name: " + queueName + ", collocated: " + collocated);
try {
Collection<IgniteInternalFuture> putFuts = new ArrayList<>();
Collection<IgniteInternalFuture> pollFuts = new ArrayList<>();
final int PUT_THREADS_PER_NODE = 3;
final int POLL_THREADS_PER_NODE = 2;
final int ITEMS_PER_THREAD = 1000;
final AtomicBoolean stopPoll = new AtomicBoolean();
Collection<int[]> pollData = new ArrayList<>();
for (int i = 0; i < GRID_CNT; i++) {
final int idx = i;
putFuts.add(GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
CollectionConfiguration colCfg = config(collocated);
IgniteQueue<Integer> queue = grid(idx).queue(queueName, 0, colCfg);
for (int i = 0; i < ITEMS_PER_THREAD; i++)
assertTrue(queue.add(i));
return null;
}
}, PUT_THREADS_PER_NODE, "testAddPollMultinode"));
for (int j = 0; j < POLL_THREADS_PER_NODE; j++) {
final int[] items = new int[ITEMS_PER_THREAD];
pollData.add(items);
pollFuts.add(GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
CollectionConfiguration colCfg = config(collocated);
IgniteQueue<Integer> queue = grid(idx).queue(queueName, 0, colCfg);
while (!stopPoll.get()) {
Integer val = queue.poll();
if (val != null)
items[val]++;
}
return null;
}
}));
}
}
for (IgniteInternalFuture fut : putFuts)
fut.get();
stopPoll.set(true);
for (IgniteInternalFuture fut : pollFuts)
fut.get();
CollectionConfiguration colCfg = config(collocated);
IgniteQueue<Integer> queue = grid(0).queue(queueName, 0, colCfg);
int[] resItems = new int[ITEMS_PER_THREAD];
Integer item;
while ((item = queue.poll()) != null)
resItems[item]++;
for (int[] items : pollData) {
for (int i = 0; i < ITEMS_PER_THREAD; i++)
resItems[i] += items[i];
}
for (int i = 0; i < ITEMS_PER_THREAD; i++)
assertEquals(PUT_THREADS_PER_NODE * GRID_CNT, resItems[i]);
assertTrue(queue.isEmpty());
}
finally {
grid(0).queue(queueName, 0, null).close();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testIterator() throws Exception {
final String queueName = UUID.randomUUID().toString();
info("Queue name: " + queueName);
try (IgniteQueue<Integer> queue = grid(0).queue(queueName, QUEUE_CAPACITY, config(false))) {
assertTrue(queue.isEmpty());
grid(0).compute().broadcast(new AddAllJob(queueName, RETRIES));
assertEquals(GRID_CNT * RETRIES, queue.size());
Collection<ClusterNode> nodes = grid(0).cluster().nodes();
for (ClusterNode node : nodes) {
Collection<Integer> queueElements = compute(grid(0).cluster().forNode(node)).call(
new IgniteCallable<Collection<Integer>>() {
@IgniteInstanceResource
private Ignite grid;
/** {@inheritDoc} */
@Override public Collection<Integer> call() throws Exception {
Collection<Integer> values = new ArrayList<>();
grid.log().info("Running job [node=" + grid.cluster().localNode().id() + "]");
IgniteQueue<Integer> locQueue = grid.queue(queueName, 0, null);
grid.log().info("Queue size " + locQueue.size());
for (Integer element : locQueue)
values.add(element);
grid.log().info("Returning: " + values);
return values;
}
}
);
assertTrue(F.eqOrdered(queue, queueElements));
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testSerialization() throws Exception {
// Random queue name.
String queueName = UUID.randomUUID().toString();
final IgniteQueue<Integer> queue = grid(0).queue(queueName, 0, config(false));
assertNotNull(queue);
try {
for (int i = 0; i < 10; i++)
queue.add(i);
Collection<Integer> c = grid(0).compute().broadcast(new QueueJob(queue));
assertEquals(GRID_CNT, c.size());
for (Integer size : c)
assertEquals((Integer)10, size);
}
finally {
queue.close();
}
}
/**
* @param q Queue.
* @param v Value.
*/
private static <T> void put(IgniteQueue<T> q, T v) {
X.println("Putting value: " + v);
q.put(v);
X.println("Done putting value: " + v);
}
/**
* Tests queue serialization.
*/
private static class QueueJob implements IgniteCallable<Integer> {
/** */
private IgniteQueue<Integer> queue;
/**
* @param queue Queue.
*/
private QueueJob(IgniteQueue<Integer> queue) {
this.queue = queue;
}
/** {@inheritDoc} */
@Override public Integer call() throws Exception {
assertNotNull(queue);
return queue.size();
}
}
/**
* Test job putting data to queue.
*/
protected static class PutJob implements IgniteCallable<Integer> {
/** */
@GridToStringExclude
@IgniteInstanceResource
private Ignite ignite;
/** Queue name. */
private final String queueName;
/** */
private final int retries;
/**
* @param queueName Queue name.
* @param retries Number of operations.
*/
PutJob(String queueName, int retries) {
this.queueName = queueName;
this.retries = retries;
}
/** {@inheritDoc} */
@Override public Integer call() throws IgniteCheckedException {
assertNotNull(ignite);
ignite.log().info("Running job [node=" + ignite.cluster().localNode().id() + ", job=" + this + "]");
IgniteQueue<Integer> queue = ignite.queue(queueName, 0, null);
assertNotNull(queue);
for (int i = 0; i < retries; i++)
queue.put(cntr.getAndIncrement());
return queue.size();
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(PutJob.class, this);
}
}
/**
* Test job putting data to queue.
*/
protected static class AddAllJob implements IgniteCallable<Integer> {
/** */
@GridToStringExclude
@IgniteInstanceResource
private Ignite ignite;
/** Queue name. */
private final String queueName;
/** */
private final int size;
/**
* @param queueName Queue name.
* @param size Number of add items.
*/
AddAllJob(String queueName, int size) {
this.queueName = queueName;
this.size = size;
}
/** {@inheritDoc} */
@Override public Integer call() throws IgniteCheckedException {
assertNotNull(ignite);
ignite.log().info("Running job [node=" + ignite.cluster().localNode().id() + ", job=" + this + "]");
IgniteQueue<Integer> queue = ignite.queue(queueName, 0, null);
assertNotNull(queue);
Collection<Integer> items = new ArrayList<>();
for (int i = 0; i < size; i++)
items.add(cntr.getAndIncrement());
queue.addAll(items);
return queue.size();
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(AddAllJob.class, this);
}
}
/**
* Test job putting data to queue.
*/
protected static class GetJob implements IgniteCallable<Integer> {
/** */
@GridToStringExclude
@IgniteInstanceResource
private Ignite ignite;
/** Queue name. */
private final String queueName;
/** */
private final int retries;
/** */
private final String expVal;
/** */
private final CollectionConfiguration colCfg;
/**
* @param queueName Queue name.
* @param colCfg Collection configuration.
* @param retries Number of operations.
* @param expVal Expected value.
*/
GetJob(String queueName, CollectionConfiguration colCfg, int retries, String expVal) {
this.queueName = queueName;
this.colCfg = colCfg;
this.retries = retries;
this.expVal = expVal;
}
/** {@inheritDoc} */
@Override public Integer call() throws IgniteCheckedException {
assertNotNull(ignite);
ignite.log().info("Running job [node=" + ignite.cluster().localNode().id() + ", job=" + this + "]");
IgniteQueue<String> queue = ignite.queue(queueName, QUEUE_CAPACITY, colCfg);
assertNotNull(queue);
assertEquals(1, queue.size());
for (int i = 0; i < retries; i++) {
assertEquals(expVal, queue.peek());
assertEquals(expVal, queue.element());
}
return queue.size();
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GetJob.class, this);
}
}
/**
* Test job putting and taking data to/from queue.
*/
protected static class PutTakeJob implements IgniteCallable<Integer> {
/** */
@GridToStringExclude
@IgniteInstanceResource
private Ignite ignite;
/** Queue name. */
private final String queueName;
/** */
private final int retries;
/**
* @param queueName Queue name.
* @param retries Number of operations.
*/
PutTakeJob(String queueName, int retries) {
this.queueName = queueName;
this.retries = retries;
}
/** {@inheritDoc} */
@Override public Integer call() throws IgniteCheckedException {
assertNotNull(ignite);
ignite.log().info("Running job [node=" + ignite.cluster().localNode().id() + ", job=" + this + ']');
IgniteQueue<Integer> queue = ignite.queue(queueName, 0, null);
assertNotNull(queue);
for (int i = 0; i < retries; i++) {
queue.put(i);
assertNotNull(queue.take());
}
return queue.size();
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(PutTakeJob.class, this);
}
}
}