blob: be744771fb0e2211ad4ddacb500d57cc4d6c6293 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.datastructures;
import java.util.ArrayList;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteQueue;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
/**
* Test that joining node is able to take items from queue.
* See GG-2311 for more information.
*/
public abstract class GridCacheQueueJoinedNodeSelfAbstractTest extends IgniteCollectionAbstractTest {
/** */
protected static final int GRID_CNT = 3;
/** */
protected static final int ITEMS_CNT = 300;
/** {@inheritDoc} */
@Override protected int gridCount() {
return GRID_CNT;
}
/**
* @throws Exception If failed.
*/
@Test
public void testTakeFromJoined() throws Exception {
String queueName = UUID.randomUUID().toString();
IgniteQueue<Integer> queue = grid(0).queue(queueName, 0, config(true));
assertNotNull(queue);
assertTrue(queue.isEmpty());
PutJob putJob = new PutJob(queueName);
IgniteCompute comp = compute(grid(0).cluster().forLocal());
IgniteFuture<?> fut = comp.runAsync(putJob);
Collection<IgniteFuture<?>> futs = new ArrayList<>(GRID_CNT - 1);
Collection<TakeJob> jobs = new ArrayList<>(GRID_CNT - 1);
int itemsLeft = ITEMS_CNT;
for (int i = 1; i < GRID_CNT; i++) {
int cnt = ITEMS_CNT / (GRID_CNT - 1);
TakeJob job = new TakeJob(queueName, cnt, 10);
jobs.add(job);
comp = compute(grid(i).cluster().forLocal());
futs.add(comp.callAsync(job));
itemsLeft -= cnt;
}
assertEquals("Not all items will be polled", 0, itemsLeft);
// Wait for half of items to be polled.
for (TakeJob job : jobs)
job.awaitItems();
log.info("Start one more grid.");
Ignite joined = startGrid(GRID_CNT);
// We expect at least one item to be taken.
TakeJob joinedJob = new TakeJob(queueName, 1, 1);
jobs.add(joinedJob);
Integer polled = forLocal(joined).call(joinedJob);
assertNotNull("Joined node should poll item", polled);
info(">>> Joined node polled " + polled);
for (IgniteFuture<?> f : futs)
f.cancel();
putJob.stop(true);
fut.get();
for (TakeJob job : jobs)
job.awaitDone();
}
/**
* Test job putting data to queue.
*/
protected class PutJob implements IgniteRunnable {
/** */
@GridToStringExclude
@IgniteInstanceResource
private Ignite ignite;
/** Queue name. */
private final String queueName;
/** */
private volatile boolean stop;
/**
* @param queueName Queue name.
*/
PutJob(String queueName) {
this.queueName = queueName;
}
/** {@inheritDoc} */
@Override public void run() {
assertNotNull(ignite);
log.info("Running job [node=" + ignite.cluster().localNode().id() +
", job=" + getClass().getSimpleName() + "]");
try {
IgniteQueue<Integer> queue = ignite.queue(queueName, 0, null);
assertNotNull(queue);
int i = 0;
while (!stop)
queue.add(i++);
}
catch (Exception e) {
error("Failed to put value to the queue", e);
fail("Unexpected exception: " + e);
}
log.info("PutJob finished");
}
/**
* @param stop Stop flag.
*/
void stop(boolean stop) {
this.stop = stop;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(PutJob.class, this);
}
}
/**
* Test job putting data to queue.
*/
protected class TakeJob implements IgniteCallable<Integer> {
/** */
@GridToStringExclude
@IgniteInstanceResource
private Ignite ignite;
/** Queue name. */
private final String queueName;
/** Maximum count of items to be taken from queue. */
private final int maxTakeCnt;
/** Latch for waiting for items. */
private final CountDownLatch takeLatch;
/** Latch for waiting for job completion. */
private final CountDownLatch doneLatch;
/**
* @param queueName Queue name.
* @param maxTakeCnt Maximum count of items to be taken from queue.
* @param waitCnt Count of items to
*/
TakeJob(String queueName, int maxTakeCnt, int waitCnt) {
this.queueName = queueName;
this.maxTakeCnt = maxTakeCnt;
takeLatch = new CountDownLatch(waitCnt);
doneLatch = new CountDownLatch(1);
}
/**
* Awaits for a given count of items to be taken.
*
* @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If interrupted.
*/
private void awaitItems() throws IgniteInterruptedCheckedException {
U.await(takeLatch);
}
/**
* Awaits for a given count of items to be taken.
*
* @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If interrupted.
*/
private void awaitDone() throws IgniteInterruptedCheckedException {
U.await(doneLatch);
}
/** {@inheritDoc} */
@Nullable @Override public Integer call() {
assertNotNull(ignite);
log.info("Running job [node=" + ignite.cluster().localNode().id() +
", job=" + getClass().getSimpleName() + "]");
Integer lastPolled = null;
try {
IgniteQueue<Integer> queue = ignite.queue(queueName, 0, null);
assertNotNull(queue);
for (int i = 0; i < maxTakeCnt; i++) {
lastPolled = queue.take();
takeLatch.countDown();
}
}
catch (IgniteException e) {
if (e.getCause() instanceof IgniteInterruptedCheckedException || e.getCause() instanceof InterruptedException)
log.info("Cancelling job due to interruption: " + e.getMessage());
else
fail("Unexpected error: " + e);
}
catch (Exception e) {
error("Failed to get value from the queue", e);
}
finally {
doneLatch.countDown();
}
log.info("TakeJob finished, last polled value: " + lastPolled);
return lastPolled;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TakeJob.class, this);
}
}
}