blob: 513c5a35ae38c65fb280d447a0b36032c346abcd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.spi.discovery.tcp;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
import org.apache.ignite.spi.discovery.AbstractDiscoverySelfTest;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest;
/**
*
*/
public class TcpDiscoverySpiFailureTimeoutSelfTest extends AbstractDiscoverySelfTest {
/** */
private static final int SPI_COUNT = 6;
/** */
private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
/** {@inheritDoc} */
@Override protected int getSpiCount() {
return SPI_COUNT;
}
/** {@inheritDoc} */
@Override protected DiscoverySpi getSpi(int idx) {
TestTcpDiscoverySpi spi = new TestTcpDiscoverySpi();
spi.setMetricsProvider(createMetricsProvider());
spi.setIpFinder(ipFinder);
switch (idx) {
case 0:
case 1:
// Ignore
break;
case 2:
spi.setAckTimeout(3000);
break;
case 3:
spi.setSocketTimeout(4000);
break;
case 4:
spi.setReconnectCount(4);
break;
case 5:
spi.setMaxAckTimeout(10000);
break;
default:
assert false;
}
return spi;
}
/**
* @throws Exception In case of error.
*/
public void testFailureDetectionTimeoutEnabled() throws Exception {
assertTrue(firstSpi().failureDetectionTimeoutEnabled());
assertTrue(secondSpi().failureDetectionTimeoutEnabled());
assertEquals(IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT.longValue(),
firstSpi().failureDetectionTimeout());
assertEquals(IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT.longValue(),
secondSpi().failureDetectionTimeout());
}
/**
* @throws Exception In case of error.
*/
public void testFailureDetectionTimeoutDisabled() throws Exception {
for (int i = 2; i < spis.size(); i++) {
assertFalse(((TcpDiscoverySpi)spis.get(i)).failureDetectionTimeoutEnabled());
assertEquals(0, ((TcpDiscoverySpi)spis.get(i)).failureDetectionTimeout());
}
}
/**
* @throws Exception In case of error.
*/
public void testFailureDetectionOnSocketOpen() throws Exception {
try {
ClusterNode node = secondSpi().getLocalNode();
firstSpi().openSocketTimeout = true;
assertFalse(firstSpi().pingNode(node.id()));
assertTrue(firstSpi().validTimeout);
assertTrue(firstSpi().err.getMessage().equals("Timeout: openSocketTimeout"));
firstSpi().openSocketTimeout = false;
firstSpi().openSocketTimeoutWait = true;
assertFalse(firstSpi().pingNode(node.id()));
assertTrue(firstSpi().validTimeout);
assertTrue(firstSpi().err.getMessage().equals("Timeout: openSocketTimeoutWait"));
}
finally {
firstSpi().resetState();
}
}
/**
* @throws Exception In case of error.
*/
public void testFailureDetectionOnSocketWrite() throws Exception {
try {
ClusterNode node = secondSpi().getLocalNode();
firstSpi().writeToSocketTimeoutWait = true;
assertFalse(firstSpi().pingNode(node.id()));
assertTrue(firstSpi().validTimeout);
firstSpi().writeToSocketTimeoutWait = false;
assertTrue(firstSpi().pingNode(node.id()));
assertTrue(firstSpi().validTimeout);
}
finally {
firstSpi().resetState();
}
}
/**
* @throws Exception In case of error.
*/
public void testConnectionCheckMessage() throws Exception {
TestTcpDiscoverySpi nextSpi = null;
try {
assert firstSpi().connCheckStatusMsgCntSent == 0;
TcpDiscoveryNode nextNode = ((ServerImpl)(firstSpi().impl)).ring().nextNode();
assertNotNull(nextNode);
nextSpi = null;
for (int i = 1; i < spis.size(); i++)
if (spis.get(i).getLocalNode().id().equals(nextNode.id())) {
nextSpi = (TestTcpDiscoverySpi)spis.get(i);
break;
}
assertNotNull(nextSpi);
assert nextSpi.connCheckStatusMsgCntReceived == 0;
firstSpi().countConnCheckMsg = true;
nextSpi.countConnCheckMsg = true;
Thread.sleep(firstSpi().failureDetectionTimeout());
firstSpi().countConnCheckMsg = false;
nextSpi.countConnCheckMsg = false;
int sent = firstSpi().connCheckStatusMsgCntSent;
int received = nextSpi.connCheckStatusMsgCntReceived;
assert sent >= 3 && sent < 7 : "messages sent: " + sent;
assert received >= 3 && received < 7 : "messages received: " + received;
}
finally {
firstSpi().resetState();
if (nextSpi != null)
nextSpi.resetState();
}
}
/**
* @throws Exception In case of error.
*/
public void testConnectionCheckMessageBackwardCompatibility() throws Exception {
TestTcpDiscoverySpi nextSpi = null;
TcpDiscoveryNode nextNode = null;
IgniteProductVersion nextNodeVer = null;
try {
assert firstSpi().connCheckStatusMsgCntSent == 0;
nextNode = ((ServerImpl)(firstSpi().impl)).ring().nextNode();
assertNotNull(nextNode);
nextSpi = null;
for (int i = 1; i < spis.size(); i++)
if (spis.get(i).getLocalNode().id().equals(nextNode.id())) {
nextSpi = (TestTcpDiscoverySpi)spis.get(i);
break;
}
assertNotNull(nextSpi);
assert nextSpi.connCheckStatusMsgCntReceived == 0;
nextNodeVer = nextNode.version();
// Overriding the version of the next node. Connection check message must not been sent to it.
nextNode.version(new IgniteProductVersion(TcpDiscoverySpi.FAILURE_DETECTION_MAJOR_VER,
(byte)(TcpDiscoverySpi.FAILURE_DETECTION_MINOR_VER - 1), TcpDiscoverySpi.FAILURE_DETECTION_MAINT_VER,
0l, null));
firstSpi().countConnCheckMsg = true;
nextSpi.countConnCheckMsg = true;
Thread.sleep(firstSpi().failureDetectionTimeout() / 2);
firstSpi().countConnCheckMsg = false;
nextSpi.countConnCheckMsg = false;
int sent = firstSpi().connCheckStatusMsgCntSent;
int received = nextSpi.connCheckStatusMsgCntReceived;
assert sent == 0 : "messages sent: " + sent;
assert received == 0 : "messages received: " + received;
}
finally {
firstSpi().resetState();
if (nextSpi != null)
nextSpi.resetState();
if (nextNode != null && nextNodeVer != null)
nextNode.version(nextNodeVer);
}
}
/**
* Returns the first spi with failure detection timeout enabled.
*
* @return SPI.
*/
private TestTcpDiscoverySpi firstSpi() {
return (TestTcpDiscoverySpi)spis.get(0);
}
/**
* Returns the second spi with failure detection timeout enabled.
*
* @return SPI.
*/
private TestTcpDiscoverySpi secondSpi() {
return (TestTcpDiscoverySpi)spis.get(1);
}
/**
*
*/
private static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
/** */
private volatile boolean openSocketTimeout;
/** */
private volatile boolean openSocketTimeoutWait;
/** */
private volatile boolean writeToSocketTimeoutWait;
/** */
private volatile boolean countConnCheckMsg;
/** */
private volatile int connCheckStatusMsgCntSent;
/** */
private volatile int connCheckStatusMsgCntReceived;
/** */
private volatile boolean validTimeout = true;
/** */
private volatile IgniteSpiOperationTimeoutException err;
/** {@inheritDoc} */
@Override protected Socket openSocket(Socket sock, InetSocketAddress sockAddr,
IgniteSpiOperationTimeoutHelper timeoutHelper)
throws IOException, IgniteSpiOperationTimeoutException {
if (openSocketTimeout) {
err = new IgniteSpiOperationTimeoutException("Timeout: openSocketTimeout");
throw err;
}
else if (openSocketTimeoutWait) {
long timeout = timeoutHelper.nextTimeoutChunk(0);
try {
Thread.sleep(timeout + 1000);
}
catch (InterruptedException e) {
// Ignore
}
try {
timeoutHelper.nextTimeoutChunk(0);
}
catch (IgniteSpiOperationTimeoutException e) {
throw (err = new IgniteSpiOperationTimeoutException("Timeout: openSocketTimeoutWait"));
}
}
super.openSocket(sock, sockAddr, timeoutHelper);
try {
Thread.sleep(1500);
}
catch (InterruptedException e) {
// Ignore
}
return sock;
}
/** {@inheritDoc} */
@Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout)
throws IOException, IgniteCheckedException {
if (!(msg instanceof TcpDiscoveryPingRequest)) {
super.writeToSocket(sock, msg, timeout);
return;
}
if (timeout >= IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT) {
validTimeout = false;
throw new IgniteCheckedException("Invalid timeout: " + timeout);
}
if (writeToSocketTimeoutWait) {
try {
Thread.sleep(timeout);
}
catch (InterruptedException e) {
// Ignore
}
}
else
super.writeToSocket(sock, msg, timeout);
}
/** {@inheritDoc} */
protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, long timeout)
throws IOException {
if (countConnCheckMsg && msg instanceof TcpDiscoveryConnectionCheckMessage)
connCheckStatusMsgCntReceived++;
super.writeToSocket(msg, sock, res, timeout);
}
/**
*
*/
private void resetState() {
openSocketTimeout = false;
openSocketTimeoutWait = false;
writeToSocketTimeoutWait = false;
err = null;
validTimeout = true;
connCheckStatusMsgCntSent = 0;
connCheckStatusMsgCntReceived = 0;
countConnCheckMsg = false;
}
}
}