IGNITE-14607 Regex Based Filtering For Discovery IP Addresses - Fixes #9048.
Signed-off-by: Ilya Kasnacheev <ilya.kasnacheev@gmail.com>
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 6417195..0f4fc15 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -72,6 +72,7 @@
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
@@ -319,6 +320,9 @@
/** IP finder. */
protected TcpDiscoveryIpFinder ipFinder;
+ /** Address filter */
+ private IgnitePredicate<InetSocketAddress> addressFilter;
+
/** Socket operations timeout. */
private long sockTimeout; // Must be initialized in the constructor of child class.
@@ -926,6 +930,22 @@
}
/**
+ * Sets filter for IP addresses. Each address found by {@link TcpDiscoveryIpFinder} will be checked against
+ * this filter and only passing addresses will be used for discovery.
+ * <p>
+ * If not specified or null, all found addresses are used.
+ *
+ * @param addressFilter Address filter to use
+ * @return {@code this} for chaining.
+ */
+ @IgniteSpiConfiguration(optional = true)
+ public TcpDiscoverySpi setAddressFilter(IgnitePredicate<InetSocketAddress> addressFilter) {
+ this.addressFilter = addressFilter;
+
+ return this;
+ }
+
+ /**
* Sets socket operations timeout. This timeout is used to limit connection time and
* write-to-socket time.
* <p>
@@ -1994,6 +2014,9 @@
assert addr != null;
try {
+ if (addressFilter != null && !addressFilter.apply(addr))
+ continue;
+
InetSocketAddress resolved = addr.isUnresolved() ?
new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort()) : addr;
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryWithAddressFilterTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryWithAddressFilterTest.java
new file mode 100644
index 0000000..5c8b251
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryWithAddressFilterTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.junit.Test;
+
+/**
+ * Test discovery SPI with address filter present
+ */
+public class TcpDiscoveryWithAddressFilterTest extends TcpDiscoveryWithWrongServerTest {
+ /** Address filter predicate which allows filtering IP addresses duringd discovery */
+ private IgnitePredicate<InetSocketAddress> addressFilter = new P1<InetSocketAddress>() {
+ @Override public boolean apply(InetSocketAddress address) {
+ // Compile regular expression
+ Pattern pattern = Pattern.compile("^/127\\.0\\.0\\.1:47503$", Pattern.CASE_INSENSITIVE);
+ // Match regex against input
+ Matcher matcher = pattern.matcher(address.toString());
+ // Use results...
+ return !(matcher.matches());
+ }
+ };
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
+
+ ipFinder.setAddresses(Collections.singleton("127.0.0.1:" + SERVER_PORT + ".." + LAST_SERVER_PORT));
+
+ cfg.setDiscoverySpi(new TcpDiscoveryWithAddressFilter().setIpFinder(ipFinder).setAddressFilter(addressFilter));
+
+ return cfg;
+ }
+
+ /**
+ * Basic test
+ */
+ @Test
+ public void testBasic() throws Exception {
+ startTcpThread(new NoResponseWorker(), SERVER_PORT);
+ startTcpThread(new NoResponseWorker(), LAST_SERVER_PORT);
+
+ simpleTest();
+ }
+
+ /**
+ * Check for incoming addresses and check that the filter was applied
+ */
+ private class TcpDiscoveryWithAddressFilter extends TcpDiscoverySpi {
+ /** {@inheritDoc} */
+ @Override protected Collection<InetSocketAddress> resolvedAddresses() throws IgniteSpiException {
+ Collection<InetSocketAddress> res = super.resolvedAddresses();
+
+ for (InetSocketAddress addr : res)
+ assertFalse(addr.getHostName().matches("^/127\\.0\\.0\\.1:47503$"));
+
+ return res;
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryWithWrongServerTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryWithWrongServerTest.java
index 09ba74b..df9b6dc 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryWithWrongServerTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryWithWrongServerTest.java
@@ -43,16 +43,16 @@
*/
public class TcpDiscoveryWithWrongServerTest extends GridCommonAbstractTest {
/** Non-Ignite Server port #1. */
- private static final int SERVER_PORT = 47500;
+ protected static final int SERVER_PORT = 47500;
/** Non-Ignite Server port #2. */
- private static final int LAST_SERVER_PORT = SERVER_PORT + 5;
+ protected static final int LAST_SERVER_PORT = SERVER_PORT + 5;
/** Non-Ignite Server sockets. */
- private List<ServerSocket> srvSocks = new ArrayList<>();
+ protected List<ServerSocket> srvSocks = new ArrayList<>();
/** Count of accepted connections to non-Ignite Server. */
- private AtomicInteger connCnt = new AtomicInteger(0);
+ protected AtomicInteger connCnt = new AtomicInteger(0);
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
@@ -81,7 +81,7 @@
* Starts tcp test thread
* @param workerFactory one of WorkerFactory
*/
- private void startTcpThread(final WorkerFactory workerFactory, final int port) throws Exception {
+ protected void startTcpThread(final WorkerFactory workerFactory, final int port) throws Exception {
final ServerSocket srvSock = new ServerSocket(port, 10, InetAddress.getByName("127.0.0.1"));
srvSocks.add(srvSock);
@@ -173,7 +173,7 @@
* It is expected that both client and server could successfully perform Discovery Procedure when there is
* unknown (test) server in the ipFinder list.
*/
- private void simpleTest() {
+ protected void simpleTest() {
try {
Ignite srv = startGrid("server");
Ignite client = startClientGrid("client");
@@ -196,7 +196,7 @@
/**
* Just a factory for runnable workers
*/
- private interface WorkerFactory {
+ protected interface WorkerFactory {
/**
* Creates a new worker for socket
* @param clientSock socket for worker
@@ -248,7 +248,7 @@
/**
* SomeResponseWorker.
*/
- private class SomeResponseWorker implements WorkerFactory {
+ protected class SomeResponseWorker implements WorkerFactory {
/** {@inheritDoc} */
@Override public Runnable newWorker(Socket clientSock) {
return new SocketWorker(clientSock) {
@@ -264,7 +264,7 @@
/**
* NoResponseWorker.
*/
- private class NoResponseWorker implements WorkerFactory {
+ protected class NoResponseWorker implements WorkerFactory {
/** {@inheritDoc} */
@Override public Runnable newWorker(Socket clientSock) {
return new SocketWorker(clientSock) {
@@ -320,7 +320,7 @@
* TcpDiscoverySpi with non-shuffled resolved IP addresses. We should ensure that in this test non-Ignite server
* is the first element of the addresses list
*/
- class TcpDiscoverySpiWithOrderedIps extends TcpDiscoverySpi {
+ protected class TcpDiscoverySpiWithOrderedIps extends TcpDiscoverySpi {
/** {@inheritDoc} */
@Override protected Collection<InetSocketAddress> resolvedAddresses() throws IgniteSpiException {
Collection<InetSocketAddress> shuffled = super.resolvedAddresses();
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index 8f19f2d..3757ce7 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -72,6 +72,7 @@
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySslSelfTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySslTrustedSelfTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySslTrustedUntrustedTest;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryWithAddressFilterTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryWithWrongServerTest;
import org.apache.ignite.spi.discovery.tcp.ipfinder.jdbc.TcpDiscoveryJdbcIpFinderSelfTest;
import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinderSelfTest;
@@ -136,6 +137,8 @@
TcpDiscoveryWithWrongServerTest.class,
+ TcpDiscoveryWithAddressFilterTest.class,
+
TcpDiscoverySpiReconnectDelayTest.class,
TcpDiscoveryNetworkIssuesTest.class,