blob: 4342859a2827af81a53321e68e226c56522f9718 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.util;
import java.util.Collections;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.Ignite;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureHandler;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.distributed.DistributedProcess;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.TEST_PROCESS;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
/**
* Tests {@link DistributedProcess} in case of coordinator node left.
*/
public class DistributedProcessCoordinatorLeftTest extends GridCommonAbstractTest {
/** Timeout to wait latches. */
public static final long TIMEOUT = 20_000L;
/** Nodes count. */
public static final int NODES_CNT = 3;
/** Stop node index. */
public static final int STOP_NODE_IDX = 0;
/** Latch to send single message on node left. */
private final CountDownLatch nodeLeftLatch = new CountDownLatch(NODES_CNT - 1);
/** Latch to await sending single messages to a failed coordinator. */
private final CountDownLatch msgSendLatch = new CountDownLatch(NODES_CNT - 1);
/** Failure handler invocation flag. */
private final AtomicBoolean failure = new AtomicBoolean();
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setLocalEventListeners(Collections.singletonMap(event -> {
nodeLeftLatch.countDown();
try {
msgSendLatch.await();
}
catch (InterruptedException e) {
fail("Unexpected interrupt.");
}
return false;
}, new int[] {EVT_NODE_LEFT, EVT_NODE_FAILED}));
cfg.setFailureHandler(new FailureHandler() {
@Override public boolean onFailure(Ignite ignite, FailureContext failureCtx) {
failure.set(true);
return false;
}
});
return cfg;
}
/**
* Tests that coordinator failing during sending single result not cause node failure and the process finishes.
*
* <ol>
* <li>Start new process of {@link DistributedProcess}.</li>
* <li>The coordinator fails.</li>
* <li>Nodes try to send a single message to the not-alive coordinator.</li>
* <li>{@link DistributedProcess} process a node left event and reinitialize a new coordinator.</li>
* <li>Process finishes.</li>
* </ol>
*
* @throws Exception If failed.
*/
@Test
public void testCoordinatorFailed() throws Exception {
startGrids(NODES_CNT);
CountDownLatch startLatch = new CountDownLatch(NODES_CNT);
CountDownLatch finishLatch = new CountDownLatch(NODES_CNT - 1);
HashMap<String, DistributedProcess<Integer, Integer>> processes = new HashMap<>();
int processRes = 1;
for (Ignite grid : G.allGrids()) {
DistributedProcess<Integer, Integer> dp = new DistributedProcess<>(((IgniteEx)grid).context(), TEST_PROCESS,
req -> {
IgniteInternalFuture<Integer> fut = runAsync(() -> {
try {
nodeLeftLatch.await();
}
catch (InterruptedException ignored) {
fail("Unexpected interrupt.");
}
return req;
});
// A single message will be sent before this latch released.
// It is guaranteed by the LIFO order of future listeners notifying.
if (!grid.name().equals(getTestIgniteInstanceName(STOP_NODE_IDX)))
fut.listen(f -> msgSendLatch.countDown());
startLatch.countDown();
return fut;
},
(uuid, res, err) -> {
if (res.values().size() == NODES_CNT - 1 && res.values().stream().allMatch(i -> i == processRes))
finishLatch.countDown();
else
fail("Unexpected process result [res=" + res + ", err=" + err + ']');
}
);
processes.put(grid.name(), dp);
}
processes.get(grid(STOP_NODE_IDX).name()).start(UUID.randomUUID(), processRes);
assertTrue(startLatch.await(TIMEOUT, MILLISECONDS));
stopGrid(STOP_NODE_IDX);
assertTrue(finishLatch.await(TIMEOUT, MILLISECONDS));
assertFalse(failure.get());
}
}