blob: d4604fe5faa6eeb3ff1aabf86883ddc8496c6a13 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.datastructures;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteQueue;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.CollectionConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
/**
* Queue basic tests.
*/
public abstract class GridCacheQueueApiSelfAbstractTest extends IgniteCollectionAbstractTest {
/** */
private static final int QUEUE_CAPACITY = 3;
/** */
private static final int THREAD_NUM = 2;
/** {@inheritDoc} */
@Override protected int gridCount() {
return 1;
}
/**
* JUnit.
*
* @throws Exception If failed.
*/
public void testPrepareQueue() throws Exception {
// Random sequence names.
String queueName1 = UUID.randomUUID().toString();
String queueName2 = UUID.randomUUID().toString();
CollectionConfiguration colCfg = config(false);
IgniteQueue queue1 = grid(0).queue(queueName1, 0, colCfg);
IgniteQueue queue2 = grid(0).queue(queueName2, 0, colCfg);
IgniteQueue queue3 = grid(0).queue(queueName1, 0, colCfg);
assertNotNull(queue1);
assertNotNull(queue2);
assertNotNull(queue3);
assert queue1.equals(queue3);
assert queue3.equals(queue1);
assert !queue3.equals(queue2);
queue1.close();
queue2.close();
queue3.close();
assertNull(grid(0).queue(queueName1, 0, null));
assertNull(grid(0).queue(queueName2, 0, null));
}
/**
* JUnit.
*
* @throws Exception If failed.
*/
public void testAddUnbounded() throws Exception {
// Random queue name.
String queueName = UUID.randomUUID().toString();
String val = UUID.randomUUID().toString();
IgniteQueue<String> queue = grid(0).queue(queueName, 0, config(false));
assert queue.add(val);
assert val.equals(queue.poll());
}
/**
* JUnit.
*
* @throws Exception If failed.
*/
public void testAddDeleteUnbounded() throws Exception {
// Random queue name.
String queueName = UUID.randomUUID().toString();
String val = UUID.randomUUID().toString();
IgniteQueue<String> queue = grid(0).queue(queueName, 0, config(false));
assert queue.add(val);
assert queue.remove(val);
assert queue.isEmpty();
}
/**
* JUnit.
*
* @throws Exception If failed.
*/
public void testCollectionMethods() throws Exception {
// Random queue name.
String queueName = UUID.randomUUID().toString();
IgniteQueue<SameHashItem> queue = grid(0).queue(queueName, 0, config(false));
int retries = 100;
// Initialize queue.
for (int i = 0; i < retries; i++)
queue.addAll(Arrays.asList(new SameHashItem(Integer.toString(i)), new SameHashItem(Integer.toString(i))));
// Get arrays from queue.
assertEquals(retries * 2, queue.toArray().length);
SameHashItem[] arr2 = new SameHashItem[retries * 3];
Object[] arr3 = queue.toArray(arr2);
assertEquals(arr2, arr3);
assertEquals(arr3[0], new SameHashItem("0"));
// Check queue items.
assertEquals(retries * 2, queue.size());
assertTrue(queue.contains(new SameHashItem(Integer.toString(14))));
assertFalse(queue.contains(new SameHashItem(Integer.toString(144))));
Collection<SameHashItem> col1 = Arrays.asList(new SameHashItem(Integer.toString(14)),
new SameHashItem(Integer.toString(14)), new SameHashItem(Integer.toString(18)));
assertTrue(queue.containsAll(col1));
Collection<SameHashItem> col2 = Arrays.asList(new SameHashItem(Integer.toString(245)),
new SameHashItem(Integer.toString(14)), new SameHashItem(Integer.toString(18)));
assertFalse(queue.containsAll(col2));
// Try to remove item.
assertTrue(queue.remove(new SameHashItem(Integer.toString(14))));
assertEquals((retries * 2) - 1, queue.size());
assertTrue(queue.contains(new SameHashItem(Integer.toString(14))));
assertTrue(queue.remove(new SameHashItem(Integer.toString(14))));
assertEquals((retries - 1) * 2, queue.size());
assertFalse(queue.remove(new SameHashItem(Integer.toString(14))));
// Try to remove some items.
assertTrue(queue.contains(new SameHashItem(Integer.toString(33))));
assertTrue(queue.removeAll(Arrays.asList(new SameHashItem(Integer.toString(15)),
new SameHashItem(Integer.toString(14)), new SameHashItem(Integer.toString(33)),
new SameHashItem(Integer.toString(1)))));
assertFalse(queue.contains(new SameHashItem(Integer.toString(33))));
// Try to retain all items.
assertTrue(queue.retainAll(Arrays.asList(new SameHashItem(Integer.toString(15)),
new SameHashItem(Integer.toString(14)), new SameHashItem(Integer.toString(33)),
new SameHashItem(Integer.toString(1)))));
assertFalse(queue.contains(new SameHashItem(Integer.toString(2))));
assert queue.isEmpty();
}
/**
* JUnit.
*
* @throws Exception If failed.
*/
public void testAddPollUnbounded() throws Exception {
// Random queue name.
String queueName = UUID.randomUUID().toString();
IgniteQueue<String> queue = grid(0).queue(queueName, 0, config(false));
assert queue.add("1");
assert queue.add("2");
assert queue.add("3");
assertEquals("1", queue.poll());
assertEquals("2", queue.poll());
assertEquals("3", queue.poll());
}
/**
* JUnit.
*
* @throws Exception If failed.
*/
public void testAddPeekUnbounded() throws Exception {
// Random queue name.
String queueName = UUID.randomUUID().toString();
IgniteQueue<String> queue = grid(0).queue(queueName, 0, config(false));
String item1 = "1";
assert queue.add(item1);
String item2 = "2";
assert queue.add(item2);
String item3 = "3";
assert queue.add(item3);
assert item1.equals(queue.peek());
assert item1.equals(queue.peek());
assert !item2.equals(queue.peek());
}
/**
* @throws Exception If failed.
*/
public void testIterator() throws Exception {
checkIterator(false);
}
/**
* @throws Exception If failed.
*/
public void testIteratorCollocated() throws Exception {
checkIterator(true);
}
/**
* @param collocated Collocated flag.
*/
private void checkIterator(boolean collocated) {
// Random queue name.
String queueName = UUID.randomUUID().toString();
IgniteQueue<String> queue = grid(0).queue(queueName, 0, config(collocated));
for (int i = 0; i < 100; i++)
assert queue.add(Integer.toString(i));
Iterator<String> iter1 = queue.iterator();
int cnt = 0;
for (int i = 0; i < 100; i++) {
assertNotNull(iter1.next());
cnt++;
}
assertEquals(100, queue.size());
assertEquals(100, cnt);
assertNotNull(queue.take());
assertNotNull(queue.take());
assertTrue(queue.remove("33"));
assertTrue(queue.remove("77"));
assertEquals(96, queue.size());
Iterator<String> iter2 = queue.iterator();
try {
iter2.remove();
}
catch (IllegalStateException e) {
info("Caught expected exception: " + e);
}
iter2.next();
iter2.remove();
cnt = 0;
while (iter2.hasNext()) {
assertNotNull(iter2.next());
cnt++;
}
assertEquals(95, cnt);
assertEquals(95, queue.size());
iter2.remove();
}
/**
* JUnit.
*
* @throws Exception If failed.
*/
public void testPutGetUnbounded() throws Exception {
// Random queue name.
String queueName = UUID.randomUUID().toString();
IgniteQueue<String> queue = grid(0).queue(queueName, QUEUE_CAPACITY, config(false));
String thName = Thread.currentThread().getName();
for (int i = 0; i < 5; i++) {
queue.put(thName);
queue.peek();
queue.take();
}
assert queue.isEmpty() : queue.size();
}
/**
* JUnit.
*
* @throws Exception If failed.
*/
public void testPutGetMultithreadUnbounded() throws Exception {
// Random queue name.
String queueName = UUID.randomUUID().toString();
final IgniteQueue<String> queue = grid(0).queue(queueName, QUEUE_CAPACITY, config(false));
multithreaded(new Callable<Void>() {
@Override public Void call() throws Exception {
String thName = Thread.currentThread().getName();
for (int i = 0; i < 5; i++) {
queue.put(thName);
queue.peek();
queue.take();
}
return null;
}
}, THREAD_NUM);
assert queue.isEmpty() : queue.size();
}
/**
* JUnit.
*
* @throws Exception If failed.
*/
public void testPutGetMultithreadBounded() throws Exception {
// Random queue name.
String queueName = UUID.randomUUID().toString();
final IgniteQueue<String> queue = grid(0).queue(queueName, QUEUE_CAPACITY, config(false));
multithreaded(new Callable<String>() {
@Override public String call() throws Exception {
String thName = Thread.currentThread().getName();
for (int i = 0; i < QUEUE_CAPACITY * 5; i++) {
queue.put(thName);
queue.peek();
queue.take();
}
return "";
}
}, THREAD_NUM);
assert queue.isEmpty() : queue.size();
}
/**
* JUnit.
*
* @throws Exception If failed.
*/
public void testQueueRemoveMultithreadBounded() throws Exception {
// Random queue name.
final String queueName = UUID.randomUUID().toString();
final IgniteQueue<String> queue = grid(0).queue(queueName, QUEUE_CAPACITY, config(false));
final CountDownLatch putLatch = new CountDownLatch(THREAD_NUM);
final CountDownLatch clearLatch = new CountDownLatch(THREAD_NUM);
IgniteInternalFuture<?> offerFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
if (log.isDebugEnabled())
log.debug("Thread has been started." + Thread.currentThread().getName());
try {
// Thread must be blocked on put operation.
for (int i = 0; i < (QUEUE_CAPACITY * THREAD_NUM); i++)
queue.offer("anything", 3, TimeUnit.MINUTES);
fail("Queue failed");
}
catch (IgniteException | IllegalStateException e) {
putLatch.countDown();
assert e.getMessage().contains("removed");
assert queue.removed();
}
if (log.isDebugEnabled())
log.debug("Thread has been stopped." + Thread.currentThread().getName());
return null;
}
}, THREAD_NUM, "offer-thread");
IgniteInternalFuture<?> closeFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
try {
IgniteQueue<String> queue = grid(0).queue(queueName, 0, null);
if (queue != null)
queue.close();
}
catch (Exception e) {
fail("Unexpected exception: " + e);
}
finally {
clearLatch.countDown();
}
return null;
}
}, THREAD_NUM, "close-thread");
assert putLatch.await(3, TimeUnit.MINUTES);
assert clearLatch.await(3, TimeUnit.MINUTES);
offerFut.get();
closeFut.get();
try {
assert queue.isEmpty() : queue.size();
fail("Queue must be removed.");
}
catch (IgniteException | IllegalStateException e) {
assert e.getMessage().contains("removed");
assert queue.removed();
}
}
/**
* JUnit.
*
* @throws Exception If failed.
*/
public void testPutRemoveUnbounded() throws Exception {
// Random queue name.
String queueName = UUID.randomUUID().toString();
IgniteQueue<String> queue = grid(0).queue(queueName, 0, config(false));
String thread = Thread.currentThread().getName();
for (int i = 0; i < QUEUE_CAPACITY; i++)
queue.put(thread);
info("Finished loop 1: " + thread);
queue.clear();
info("Cleared queue 1: " + thread);
assert queue.isEmpty() : "Queue must be empty. " + queue.size();
}
/**
* JUnit.
*
* @throws Exception If failed.
*/
public void testPutRemoveMultiThreadedUnbounded() throws Exception {
// Random queue name.
String queueName = UUID.randomUUID().toString();
final IgniteQueue<String> queue = grid(0).queue(queueName, 0, config(false));
multithreaded(
new Callable<String>() {
@Override public String call() throws Exception {
String thread = Thread.currentThread().getName();
for (int i = 0; i < QUEUE_CAPACITY; i++)
queue.put(thread);
info("Finished loop 1: " + thread);
queue.clear();
info("Cleared queue 1: " + thread);
return "";
}
},
THREAD_NUM
);
assert queue.isEmpty() : "Queue must be empty. " + queue.size();
}
/**
* JUnit.
*
* @throws Exception If failed.
*/
public void testPutRemovePeekPollUnbounded() throws Exception {
// Random queue name.
String queueName = UUID.randomUUID().toString();
IgniteQueue<String> queue = grid(0).queue(queueName, 0, config(false));
for (int i = 0; i < QUEUE_CAPACITY; i++)
queue.put("Item-" + i);
assertEquals(QUEUE_CAPACITY, queue.size());
queue.remove("Item-1");
assertEquals(QUEUE_CAPACITY - 1, queue.size());
assertEquals("Item-0", queue.peek());
assertEquals("Item-0", queue.poll());
assertEquals("Item-2", queue.poll());
assertEquals(0, queue.size());
queue.clear();
assertTrue(queue.isEmpty());
}
/**
* JUnit.
*
* @throws Exception If failed.
*/
public void testRemovePeek() throws Exception {
// Random queue name.
String queueName = UUID.randomUUID().toString();
IgniteQueue<String> queue = grid(0).queue(queueName, 0, config(false));
for (int i = 0; i < 5; i++)
queue.put("Item-" + i);
queue.remove("Item-1");
assertEquals("Item-0", queue.peek());
queue.remove("Item-2");
assertEquals("Item-0", queue.peek());
queue.remove("Item-0");
assertEquals("Item-3", queue.peek());
queue.remove("Item-4");
assertEquals("Item-3", queue.peek());
queue.remove("Item-3");
assertNull(queue.peek());
}
/**
* @throws Exception If failed.
*/
public void testReuseCache() throws Exception {
CollectionConfiguration colCfg = collectionConfiguration();
IgniteQueue queue1 = grid(0).queue("Queue1", 0, colCfg);
IgniteQueue queue2 = grid(0).queue("Queue2", 0, colCfg);
assertEquals(getQueueCache(queue1), getQueueCache(queue2));
}
/**
* @throws Exception If failed.
*/
public void testNotReuseCache() throws Exception {
CollectionConfiguration colCfg1 = collectionConfiguration();
CollectionConfiguration colCfg2 = collectionConfiguration();
if (colCfg2.getAtomicityMode() == ATOMIC)
colCfg2.setAtomicityMode(TRANSACTIONAL);
else
colCfg2.setAtomicityMode(ATOMIC);
IgniteQueue queue1 = grid(0).queue("Queue1", 0, colCfg1);
IgniteQueue queue2 = grid(0).queue("Queue2", 0, colCfg2);
assertNotSame(getQueueCache(queue1), getQueueCache(queue2));
}
/**
* @throws Exception If failed.
*/
public void testFilterNode() throws Exception {
CollectionConfiguration colCfg1 = collectionConfiguration();
CollectionConfiguration colCfg2 = collectionConfiguration();
colCfg2.setNodeFilter(CacheConfiguration.ALL_NODES);
IgniteQueue queue1 = grid(0).queue("Queue1", 0, colCfg1);
IgniteQueue queue2 = grid(0).queue("Queue2", 0, colCfg2);
assertNotSame(getQueueCache(queue1), getQueueCache(queue2));
colCfg1.setNodeFilter(CacheConfiguration.ALL_NODES);
IgniteQueue queue3 = grid(0).queue("Queue3", 0, colCfg1);
assertEquals(getQueueCache(queue2), getQueueCache(queue3));
}
/**
* @throws Exception If failed.
*/
public void testSystemCache() throws Exception {
CollectionConfiguration colCfg = collectionConfiguration();
IgniteQueue queue = grid(0).queue("Queue1", 0, colCfg);
final CacheConfiguration ccfg = getQueueCache(queue);
GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception {
grid(0).cache(ccfg.getName());
return null;
}
}, IllegalStateException.class, "Failed to get cache because it is a system cache");
assertNotNull(((IgniteKernal)grid(0)).internalCache(ccfg.getName()));
}
/**
* @throws Exception If failed.
*/
public void testAffinityRun() throws Exception {
final CollectionConfiguration colCfg = collectionConfiguration();
colCfg.setCollocated(false);
colCfg.setCacheMode(CacheMode.PARTITIONED);
try (final IgniteQueue<Integer> queue1 = grid(0).queue("Queue1", 0, colCfg)) {
GridTestUtils.assertThrows(
log,
new Callable<Void>() {
@Override public Void call() throws Exception {
queue1.affinityRun(new IgniteRunnable() {
@Override public void run() {
// No-op.
}
});
return null;
}
},
IgniteException.class,
"Failed to execute affinityRun() for non-collocated queue: " + queue1.name() +
". This operation is supported only for collocated queues.");
}
colCfg.setCollocated(true);
try (final IgniteQueue<Integer> queue2 = grid(0).queue("Queue2", 0, colCfg)) {
queue2.add(100);
queue2.affinityRun(new IgniteRunnable() {
@IgniteInstanceResource
private IgniteEx ignite;
@Override public void run() {
assertTrue(ignite.cachex("datastructures_0").affinity().isPrimaryOrBackup(
ignite.cluster().localNode(), "Queue2"));
assertEquals(100, queue2.take().intValue());
}
});
}
}
/**
* @throws Exception If failed.
*/
public void testAffinityCall() throws Exception {
final CollectionConfiguration colCfg = collectionConfiguration();
colCfg.setCollocated(false);
colCfg.setCacheMode(CacheMode.PARTITIONED);
try (final IgniteQueue<Integer> queue1 = grid(0).queue("Queue1", 0, colCfg)) {
GridTestUtils.assertThrows(
log,
new Callable<Void>() {
@Override public Void call() throws Exception {
queue1.affinityCall(new IgniteCallable<Object>() {
@Override public Object call() {
return null;
}
});
return null;
}
},
IgniteException.class,
"Failed to execute affinityCall() for non-collocated queue: " + queue1.name() +
". This operation is supported only for collocated queues.");
}
colCfg.setCollocated(true);
try (final IgniteQueue<Integer> queue2 = grid(0).queue("Queue2", 0, colCfg)) {
queue2.add(100);
Integer res = queue2.affinityCall(new IgniteCallable<Integer>() {
@IgniteInstanceResource
private IgniteEx ignite;
@Override public Integer call() {
assertTrue(ignite.cachex("datastructures_0").affinity().isPrimaryOrBackup(
ignite.cluster().localNode(), "Queue2"));
return queue2.take();
}
});
assertEquals(100, res.intValue());
}
}
/**
* Implementation of ignite data structures internally uses special system caches, need make sure
* that transaction on these system caches do not intersect with transactions started by user.
*
* @throws Exception If failed.
*/
public void testIsolation() throws Exception {
Ignite ignite = grid(0);
CacheConfiguration cfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
cfg.setName("myCache");
cfg.setAtomicityMode(TRANSACTIONAL);
cfg.setWriteSynchronizationMode(FULL_SYNC);
IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(cfg);
try {
String queueName = UUID.randomUUID().toString();
IgniteQueue<String> queue = grid(0).queue(queueName, 0, config(false));
try (Transaction tx = ignite.transactions().txStart()) {
cache.put(1, 1);
for (int i = 0; i < QUEUE_CAPACITY; i++)
queue.put("Item-" + i);
tx.rollback();
}
assertEquals(0, cache.size());
assertEquals(QUEUE_CAPACITY, queue.size());
queue.remove("Item-1");
assertEquals(QUEUE_CAPACITY - 1, queue.size());
assertEquals("Item-0", queue.peek());
assertEquals("Item-0", queue.poll());
assertEquals("Item-2", queue.poll());
assertEquals(0, queue.size());
queue.clear();
assertTrue(queue.isEmpty());
}
finally {
ignite.destroyCache(cfg.getName());
}
}
/**
* Test class with the same hash code.
*/
private static class SameHashItem implements Serializable {
/** Data field*/
private final String s;
/**
* @param s Item data.
*/
private SameHashItem(String s) {
this.s = s;
}
/** {@inheritDoc} */
@Override public int hashCode() {
return 0;
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o instanceof SameHashItem) {
SameHashItem i = (SameHashItem)o;
return s.equals(i.s);
}
return false;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(SameHashItem.class, this);
}
}
}