blob: 4764bd33b6bf09c45026ebd13d2fd62365bab129 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobAdapter;
import org.apache.ignite.compute.ComputeJobMasterLeaveAware;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeTaskSession;
import org.apache.ignite.compute.ComputeTaskSplitAdapter;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.util.typedef.CX1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.resources.TaskSessionResource;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.junits.common.GridCommonTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
/**
* Test behavior of jobs when master node has failed, but job class implements {@link org.apache.ignite.compute.ComputeJobMasterLeaveAware}
* interface.
*/
@GridCommonTest(group = "Task Session")
public class GridJobMasterLeaveAwareSelfTest extends GridCommonAbstractTest {
/** Total grid count within the cloud. */
private static final int GRID_CNT = 2;
/** Counts how many times master-leave interface implementation was called. */
private static volatile CountDownLatch invokeLatch;
/** Latch which blocks job execution until main thread has sent node fail signal. */
private static volatile CountDownLatch latch;
/** Latch which blocks main thread until all jobs start their execution. */
private static volatile CountDownLatch jobLatch;
/** Should job wait for callback. */
private static volatile boolean awaitMasterLeaveCallback = true;
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
awaitMasterLeaveCallback = true;
latch = new CountDownLatch(1);
jobLatch = new CountDownLatch(GRID_CNT - 1);
invokeLatch = new CountDownLatch(GRID_CNT - 1);
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setCommunicationSpi(new CommunicationSpi());
CacheConfiguration ccfg = defaultCacheConfiguration();
ccfg.setCacheMode(PARTITIONED);
cfg.setCacheConfiguration(ccfg);
return cfg;
}
/**
* Get predicate which allows task execution on all nodes except the last one.
*
* @return Predicate.
*/
private IgnitePredicate<ClusterNode> excludeLastPredicate() {
return new IgnitePredicate<ClusterNode>() {
@Override public boolean apply(ClusterNode e) {
return !e.id().equals(grid(GRID_CNT - 1).localNode().id());
}
};
}
/**
* Constructor.
*/
public GridJobMasterLeaveAwareSelfTest() {
super(/* don't start grid */ false);
}
/**
* Ensure that {@link org.apache.ignite.compute.ComputeJobMasterLeaveAware} callback is invoked on job which is initiated by
* master and is currently running on it.
*
* @throws Exception If failed.
*/
@Test
public void testLocalJobOnMaster() throws Exception {
invokeLatch = new CountDownLatch(1);
jobLatch = new CountDownLatch(1);
Ignite g = startGrid(0);
g.compute().executeAsync(new TestTask(1), null);
jobLatch.await();
// Count down the latch in a separate thread.
new Thread(new Runnable() {
@Override public void run() {
try {
U.sleep(500);
}
catch (IgniteInterruptedCheckedException ignore) {
// No-op.
}
latch.countDown();
}
}).start();
stopGrid(0, true);
latch.countDown();
assert invokeLatch.await(5000, MILLISECONDS);
}
/**
* Ensure that {@link org.apache.ignite.compute.ComputeJobMasterLeaveAware} callback is invoked
* when master node leaves topology normally.
*
* @throws Exception If failed.
*/
@Test
public void testMasterStoppedNormally() throws Exception {
// Start grids.
for (int i = 0; i < GRID_CNT; i++)
startGrid(i);
int lastGridIdx = GRID_CNT - 1;
compute(grid(lastGridIdx).cluster().forPredicate(excludeLastPredicate()))
.executeAsync(new TestTask(GRID_CNT - 1), null);
jobLatch.await();
stopGrid(lastGridIdx, true);
latch.countDown();
assert invokeLatch.await(5000, MILLISECONDS);
}
/**
* Ensure that {@link org.apache.ignite.compute.ComputeJobMasterLeaveAware} callback is invoked when master node leaves topology
* abruptly (e.g. due to a network failure or immediate node shutdown).
*
* @throws Exception If failed.
*/
@Test
public void testMasterStoppedAbruptly() throws Exception {
// Start grids.
for (int i = 0; i < GRID_CNT; i++)
startGrid(i);
int lastGridIdx = GRID_CNT - 1;
compute(grid(lastGridIdx).cluster().forPredicate(excludeLastPredicate()))
.executeAsync(new TestTask(GRID_CNT - 1), null);
jobLatch.await();
((CommunicationSpi)grid(lastGridIdx).configuration().getCommunicationSpi()).blockMessages();
stopGrid(lastGridIdx, true);
latch.countDown();
assert invokeLatch.await(5000, MILLISECONDS);
}
/**
* Ensure that {@link org.apache.ignite.compute.ComputeJobMasterLeaveAware} callback is invoked when fails to send
* {@link GridJobExecuteResponse} to master node.
*
* @throws Exception If failed.
*/
@Test
public void testCannotSendJobExecuteResponse() throws Exception {
awaitMasterLeaveCallback = false;
// Start grids.
for (int i = 0; i < GRID_CNT; i++)
startGrid(i);
int lastGridIdx = GRID_CNT - 1;
compute(grid(lastGridIdx).cluster().forPredicate(excludeLastPredicate()))
.executeAsync(new TestTask(GRID_CNT - 1), null);
jobLatch.await();
for (int i = 0; i < lastGridIdx; i++)
((CommunicationSpi)grid(i).configuration().getCommunicationSpi()).waitLatch();
latch.countDown();
// Ensure that all worker nodes has already started job response sending.
for (int i = 0; i < lastGridIdx; i++)
((CommunicationSpi)grid(i).configuration().getCommunicationSpi()).awaitResponse();
// Now we stop master grid.
stopGrid(lastGridIdx, true);
waitForTopology(GRID_CNT - 1);
// Release communication SPI wait latches. As master node is stopped, job worker will receive and exception.
for (int i = 0; i < lastGridIdx; i++)
((CommunicationSpi)grid(i).configuration().getCommunicationSpi()).releaseWaitLatch();
assert invokeLatch.await(5000, MILLISECONDS);
}
/**
* @throws Exception If failed.
*/
@Test
public void testApply1() throws Exception {
testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, IgniteFuture<?>>() {
@Override public IgniteFuture<?> applyx(ClusterGroup grid) {
return compute(grid).applyAsync(new TestClosure(), "arg");
}
});
}
/**
* @throws Exception If failed.
*/
@Test
public void testApply2() throws Exception {
testMasterLeaveAwareCallback(2, new CX1<ClusterGroup, IgniteFuture<?>>() {
@Override public IgniteFuture<?> applyx(ClusterGroup grid) {
return compute(grid).applyAsync(new TestClosure(), Arrays.asList("arg1", "arg2"));
}
});
}
/**
* @throws Exception If failed.
*/
@Test
public void testApply3() throws Exception {
testMasterLeaveAwareCallback(2, new CX1<ClusterGroup, IgniteFuture<?>>() {
@Override public IgniteFuture<?> applyx(ClusterGroup grid) {
return compute(grid).applyAsync(new TestClosure(),
Arrays.asList("arg1", "arg2"),
new IgniteReducer<Void, Object>() {
@Override public boolean collect(@Nullable Void aVoid) {
return true;
}
@Override public Object reduce() {
return null;
}
});
}
});
}
/**
* @throws Exception If failed.
*/
@Test
public void testRun1() throws Exception {
testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, IgniteFuture<?>>() {
@Override public IgniteFuture<?> applyx(ClusterGroup prj) {
return compute(prj).runAsync(new TestRunnable());
}
});
}
/**
* @throws Exception If failed.
*/
@Test
public void testRun2() throws Exception {
testMasterLeaveAwareCallback(2, new CX1<ClusterGroup, IgniteFuture<?>>() {
@Override public IgniteFuture<?> applyx(ClusterGroup prj) {
return compute(prj).runAsync(Arrays.asList(new TestRunnable(), new TestRunnable()));
}
});
}
/**
* @throws Exception If failed.
*/
@Test
public void testCall1() throws Exception {
testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, IgniteFuture<?>>() {
@Override public IgniteFuture<?> applyx(ClusterGroup prj) {
return compute(prj).callAsync(new TestCallable());
}
});
}
/**
* @throws Exception If failed.
*/
@Test
public void testCall2() throws Exception {
testMasterLeaveAwareCallback(2, new CX1<ClusterGroup, IgniteFuture<?>>() {
@Override public IgniteFuture<?> applyx(ClusterGroup prj) {
return compute(prj).callAsync(Arrays.asList(new TestCallable(), new TestCallable()));
}
});
}
/**
* @throws Exception If failed.
*/
@Test
public void testCall3() throws Exception {
testMasterLeaveAwareCallback(2, new CX1<ClusterGroup, IgniteFuture<?>>() {
@Override public IgniteFuture<?> applyx(ClusterGroup prj) {
return compute(prj).callAsync(
Arrays.asList(new TestCallable(), new TestCallable()),
new IgniteReducer<Void, Object>() {
@Override public boolean collect(@Nullable Void aVoid) {
return true;
}
@Override public Object reduce() {
return null;
}
});
}
});
}
/**
* @throws Exception If failed.
*/
@Test
public void testBroadcast1() throws Exception {
testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, IgniteFuture<?>>() {
@Override public IgniteFuture<?> applyx(ClusterGroup prj) {
return compute(prj).broadcastAsync(new TestRunnable());
}
});
}
/**
* @throws Exception If failed.
*/
@Test
public void testBroadcast2() throws Exception {
testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, IgniteFuture<?>>() {
@Override public IgniteFuture<?> applyx(ClusterGroup prj) {
return compute(prj).broadcastAsync(new TestCallable());
}
});
}
/**
* @throws Exception If failed.
*/
@Test
public void testBroadcast3() throws Exception {
testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, IgniteFuture<?>>() {
@Override public IgniteFuture<?> applyx(ClusterGroup prj) {
return compute(prj).broadcastAsync(new TestClosure(), "arg");
}
});
}
/**
* @throws Exception If failed.
*/
@Test
public void testAffinityRun() throws Exception {
testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, IgniteFuture<?>>() {
@Override public IgniteFuture<?> applyx(ClusterGroup prj) {
Affinity<Object> aff = prj.ignite().affinity(DEFAULT_CACHE_NAME);
ClusterNode node = F.first(prj.nodes());
return compute(prj).affinityRunAsync(DEFAULT_CACHE_NAME, keyForNode(aff, node), new TestRunnable());
}
});
}
/**
* @throws Exception If failed.
*/
@Test
public void testAffinityCall() throws Exception {
testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, IgniteFuture<?>>() {
@Override public IgniteFuture<?> applyx(ClusterGroup prj) {
Affinity<Object> aff = prj.ignite().affinity(DEFAULT_CACHE_NAME);
ClusterNode node = F.first(prj.nodes());
return compute(prj).affinityCallAsync(DEFAULT_CACHE_NAME, keyForNode(aff, node), new TestCallable());
}
});
}
/**
* @param aff Cache affinity.
* @param node Node.
* @return Finds some cache key for which given node is primary.
*/
private Object keyForNode(Affinity<Object> aff, ClusterNode node) {
assertNotNull(node);
Object key = null;
for (int i = 0; i < 1000; i++) {
if (aff.isPrimary(node, i)) {
key = i;
break;
}
}
assertNotNull(key);
return key;
}
/**
* @param expJobs Expected jobs number.
* @param taskStarter Task started.
* @throws Exception If failed.
*/
private void testMasterLeaveAwareCallback(int expJobs, IgniteClosure<ClusterGroup, IgniteFuture<?>> taskStarter)
throws Exception {
jobLatch = new CountDownLatch(expJobs);
invokeLatch = new CountDownLatch(expJobs);
for (int i = 0; i < GRID_CNT; i++)
startGrid(i);
int lastGridIdx = GRID_CNT - 1;
IgniteFuture<?> fut = taskStarter.apply(grid(lastGridIdx).cluster().forPredicate(excludeLastPredicate()));
jobLatch.await();
stopGrid(lastGridIdx, true);
latch.countDown();
assert invokeLatch.await(5000, MILLISECONDS);
GridTestUtils.assertThrows(log, () -> fut.get(), IgniteException.class, null);
}
/**
*/
private static class TestMasterLeaveAware {
/** */
private final CountDownLatch latch0 = new CountDownLatch(1);
/**
* @param log Logger.
*/
private void execute(IgniteLogger log) {
try {
log.info("Started execute.");
// Countdown shared job latch so that the main thread know that all jobs are
// inside the "execute" routine.
jobLatch.countDown();
log.info("After job latch.");
// Await for the main thread to allow jobs to proceed.
latch.await();
log.info("After latch.");
if (awaitMasterLeaveCallback) {
latch0.await();
log.info("After latch0.");
}
else
log.info("Latch 0 skipped.");
}
catch (InterruptedException e) {
// We do not expect any interruptions here, hence this statement.
fail("Unexpected exception: " + e);
}
}
/**
* @param log Logger.
* @param job Actual job.
*/
private void onMasterLeave(IgniteLogger log, Object job) {
log.info("Callback executed: " + job);
latch0.countDown();
invokeLatch.countDown();
}
}
/**
* Master leave aware callable.
*/
private static class TestCallable implements IgniteCallable<Void>, ComputeJobMasterLeaveAware {
/** Task session. */
@LoggerResource
private IgniteLogger log;
/** */
private TestMasterLeaveAware masterLeaveAware = new TestMasterLeaveAware();
/** {@inheritDoc} */
@Override public Void call() throws Exception {
masterLeaveAware.execute(log);
return null;
}
/** {@inheritDoc} */
@Override public void onMasterNodeLeft(ComputeTaskSession ses) {
masterLeaveAware.onMasterLeave(log, this);
}
}
/**
* Master leave aware runnable.
*/
private static class TestRunnable implements IgniteRunnable, ComputeJobMasterLeaveAware {
/** Task session. */
@LoggerResource
private IgniteLogger log;
/** */
private TestMasterLeaveAware masterLeaveAware = new TestMasterLeaveAware();
/** {@inheritDoc} */
@Override public void run() {
masterLeaveAware.execute(log);
}
/** {@inheritDoc} */
@Override public void onMasterNodeLeft(ComputeTaskSession ses) {
masterLeaveAware.onMasterLeave(log, this);
}
}
/**
* Master leave aware closure.
*/
private static class TestClosure implements IgniteClosure<String, Void>, ComputeJobMasterLeaveAware {
/** Task session. */
@LoggerResource
private IgniteLogger log;
/** */
private TestMasterLeaveAware masterLeaveAware = new TestMasterLeaveAware();
/** {@inheritDoc} */
@Override public Void apply(String arg) {
masterLeaveAware.execute(log);
return null;
}
/** {@inheritDoc} */
@Override public void onMasterNodeLeft(ComputeTaskSession ses) {
masterLeaveAware.onMasterLeave(log, this);
}
}
/**
* Base implementation of dummy task which produces predefined amount of test jobs on split stage.
*/
private static class TestTask extends ComputeTaskSplitAdapter<String, Integer> {
/** How many jobs to produce. */
private int jobCnt;
/** */
@TaskSessionResource
private ComputeTaskSession taskSes;
/**
* Constructor.
*
* @param jobCnt How many jobs to produce on split stage.
*/
private TestTask(int jobCnt) {
this.jobCnt = jobCnt;
}
/** {@inheritDoc} */
@Override protected Collection<? extends ComputeJob> split(int gridSize, String arg) {
Collection<ComputeJobAdapter> jobs = new ArrayList<>(jobCnt);
for (int i = 0; i < jobCnt; i++)
jobs.add(new TestJob());
return jobs;
}
/** {@inheritDoc} */
@Override public Integer reduce(List<ComputeJobResult> results) {
return null;
}
}
/**
* Base implementation of dummy test job.
*/
private static class TestJob extends ComputeJobAdapter implements ComputeJobMasterLeaveAware {
/** Task session. */
@LoggerResource
private IgniteLogger log;
/** */
private TestMasterLeaveAware masterLeaveAware = new TestMasterLeaveAware();
/**
* Constructor
*/
private TestJob() {
super(new Object());
}
/** {@inheritDoc} */
@Override public Object execute() {
masterLeaveAware.execute(log);
return null;
}
/** {@inheritDoc} */
@Override public void onMasterNodeLeft(ComputeTaskSession ses) {
masterLeaveAware.onMasterLeave(log, this);
}
}
/**
* Communication SPI which could optionally block outgoing messages.
*/
private static class CommunicationSpi extends TcpCommunicationSpi {
/** Whether to block all outgoing messages. */
private volatile boolean block;
/** Job execution response latch. */
private CountDownLatch respLatch = new CountDownLatch(1);
/** Whether to wait for a wait latch before returning. */
private volatile boolean wait;
/** Wait latch. */
private CountDownLatch waitLatch = new CountDownLatch(1);
/** {@inheritDoc} */
@Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
throws IgniteSpiException {
sendMessage0(node, msg, ackClosure);
}
/**
* Send message optionally either blocking it or throwing an exception if it is of
* {@link GridJobExecuteResponse} type.
*
* @param node Destination node.
* @param msg Message to be sent.
* @param ackClosure Ack closure.
* @throws org.apache.ignite.spi.IgniteSpiException If failed.
*/
private void sendMessage0(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
throws IgniteSpiException {
if (msg instanceof GridIoMessage) {
GridIoMessage msg0 = (GridIoMessage)msg;
if (msg0.message() instanceof GridJobExecuteResponse) {
respLatch.countDown();
if (wait) {
try {
U.await(waitLatch);
}
catch (IgniteInterruptedCheckedException ignore) {
// No-op.
}
}
}
}
if (!block)
super.sendMessage(node, msg, ackClosure);
}
/**
* Block all outgoing message.
*/
void blockMessages() {
block = true;
}
/**
* Whether to block on a wait latch.
*/
private void waitLatch() {
wait = true;
}
/**
* Count down wait latch.
*/
private void releaseWaitLatch() {
waitLatch.countDown();
}
/**
* Await for job execution response to come.
*
* @throws IgniteInterruptedCheckedException If interrupted.
*/
private void awaitResponse() throws IgniteInterruptedCheckedException {
U.await(respLatch);
}
}
}