| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.datastructures; |
| |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteQueue; |
| import org.apache.ignite.cache.CacheAtomicityMode; |
| import org.apache.ignite.cache.CacheMode; |
| import org.apache.ignite.cache.CachePeekMode; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.configuration.CollectionConfiguration; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.processors.cache.GridCacheAdapter; |
| import org.apache.ignite.internal.processors.cache.GridCacheContext; |
| import org.apache.ignite.internal.processors.datastructures.GridCacheQueueHeaderKey; |
| import org.apache.ignite.internal.util.typedef.PAX; |
| import org.apache.ignite.testframework.GridTestUtils; |
| import org.junit.Test; |
| |
| import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; |
| import static org.apache.ignite.cache.CacheMode.PARTITIONED; |
| |
| /** |
| * Tests cleanup of orphaned queue items. |
| */ |
| public class GridCacheQueueCleanupSelfTest extends IgniteCollectionAbstractTest { |
| /** */ |
| private static final String QUEUE_NAME1 = "CleanupTestQueue1"; |
| |
| /** */ |
| private static final String QUEUE_NAME2 = "CleanupTestQueue2"; |
| |
| /** {@inheritDoc} */ |
| @Override protected int gridCount() { |
| return 4; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected CacheMode collectionCacheMode() { |
| return PARTITIONED; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected CacheAtomicityMode collectionCacheAtomicityMode() { |
| return TRANSACTIONAL; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected CollectionConfiguration collectionConfiguration() { |
| CollectionConfiguration colCfg = super.collectionConfiguration(); |
| |
| colCfg.setBackups(0); |
| |
| return colCfg; |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testCleanup() throws Exception { |
| IgniteQueue<Integer> queue = grid(0).queue(QUEUE_NAME1, 0, config(false)); |
| |
| GridCacheContext cctx = GridTestUtils.getFieldValue(queue, "cctx"); |
| |
| final String queueCacheName = cctx.name(); |
| |
| ClusterNode node = grid(0).affinity(queueCacheName).mapKeyToNode(new GridCacheQueueHeaderKey(QUEUE_NAME1)); |
| |
| final Ignite ignite = grid(0).localNode().equals(node) ? grid(1) : grid(0); |
| |
| /* |
| assertNotNull(queue); |
| |
| // Add/poll some items. |
| |
| for (int i = 0; i < 500; i++) |
| queue.add(i); |
| |
| for (int i = 0; i < 10; i++) |
| queue.poll(); |
| |
| assertTrue(!queue.isEmpty()); |
| |
| // Kill node containing queue header. |
| |
| final String killGridName = node.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME); |
| |
| stopGrid(killGridName); |
| |
| assertNull(((IgniteKernal)grid).cache(DEFAULT_CACHE_NAME).dataStructures().queue(QUEUE_NAME1, 0, false, false)); |
| |
| final AtomicBoolean stop = new AtomicBoolean(false); |
| |
| GridFuture<?> fut1; |
| GridFuture<?> fut2; |
| |
| try { |
| // Start threads using cache concurrently with cleanup thread. |
| fut1 = startAddPollThread(grid, stop, QUEUE_NAME1); |
| fut2 = startAddPollThread(grid, stop, QUEUE_NAME2); |
| |
| U.sleep(3000); // Give some time for cleanup thread. |
| } |
| finally { |
| stop.set(true); |
| } |
| |
| fut1.get(); |
| fut2.get(); |
| |
| ((IgniteKernal)grid).cache(DEFAULT_CACHE_NAME).dataStructures().removeQueue(QUEUE_NAME1); |
| ((IgniteKernal)grid).cache(DEFAULT_CACHE_NAME).dataStructures().removeQueue(QUEUE_NAME2); |
| |
| assertTrue(GridTestUtils.waitForCondition(new PAX() { |
| @Override public boolean applyx() { |
| for (int i = 0; i < gridCount(); i++) { |
| if (getTestIgniteInstanceName(i).equals(killGridName)) |
| continue; |
| |
| Iterator<GridCacheEntryEx<Object, Object>> entries = |
| ((GridKernal)grid(i)).context().cache().internalCache(DEFAULT_CACHE_NAME).map().allEntries0().iterator(); |
| |
| if (entries.hasNext()) { |
| log.info("Found cache entries, will wait: " + entries.next()); |
| |
| return false; |
| } |
| } |
| |
| return true; |
| } |
| }, 5000)); |
| |
| startGrid(killGridName); |
| |
| // Create queue again. |
| queue = ((IgniteKernal)grid).cache(DEFAULT_CACHE_NAME).dataStructures().queue(QUEUE_NAME1, 0, false, true); |
| */ |
| |
| assertEquals(0, queue.size()); |
| |
| for (int i = 0; i < 500; i++) |
| queue.add(i); |
| |
| assertEquals(500, queue.size()); |
| |
| // Remove queue and create queue with the same name. |
| queue.close(); |
| |
| queue = ignite.queue(QUEUE_NAME1, 0, config(false)); |
| |
| assertEquals(0, queue.size()); |
| |
| for (int i = 0; i < 500; i++) |
| queue.add(i); |
| |
| assertEquals(500, queue.size()); |
| |
| // Check that items of removed queue are removed, items of new queue not. |
| assertTrue(GridTestUtils.waitForCondition(new PAX() { |
| @Override public boolean applyx() throws IgniteCheckedException { |
| int cnt = 0; |
| |
| for (int i = 0; i < gridCount(); i++) { |
| GridCacheAdapter<Object, Object> cache = |
| grid(i).context().cache().internalCache(queueCacheName); |
| |
| for (Object e : cache.localEntries(new CachePeekMode[]{CachePeekMode.ALL})) |
| cnt++; |
| } |
| |
| if (cnt > 501) { // 500 items + header. |
| log.info("Found more cache entries than expected, will wait: " + cnt); |
| |
| return false; |
| } |
| |
| return true; |
| } |
| }, 5000)); |
| |
| for (int i = 0; i < 500; i++) |
| assertEquals((Integer)i, queue.poll()); |
| } |
| |
| /** |
| * @param ignite Grid. |
| * @param stop Stop flag. |
| * @param queueName Queue name. |
| * @return Future completing when thread finishes. |
| */ |
| private IgniteInternalFuture<?> startAddPollThread(final Ignite ignite, final AtomicBoolean stop, final String queueName) { |
| return GridTestUtils.runAsync(new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| IgniteQueue<Integer> queue = ignite.queue(queueName, 0, config(false)); |
| |
| assertEquals(0, queue.size()); |
| |
| for (int i = 0; i < 10; i++) |
| assertTrue(queue.add(i)); |
| |
| while (!stop.get()) { |
| for (int i = 0; i < 100; i++) |
| assertTrue(queue.add(i)); |
| |
| for (int i = 0; i < 100; i++) |
| assertNotNull(queue.poll()); |
| } |
| |
| assertEquals(10, queue.size()); |
| |
| return null; |
| } |
| }); |
| } |
| } |