IGNITE-11320: Support for individual reconnect in case of best effort affinity mode added.
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcThinDriverAffinityAwarenessTestSuite.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcThinDriverAffinityAwarenessTestSuite.java
index 888d65e..3937fe2 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcThinDriverAffinityAwarenessTestSuite.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcThinDriverAffinityAwarenessTestSuite.java
@@ -18,6 +18,7 @@
package org.apache.ignite.jdbc.suite;
import org.apache.ignite.jdbc.thin.JdbcThinAbstractSelfTest;
+import org.apache.ignite.jdbc.thin.JdbcThinAffinityAwarenessReconnectionSelfTest;
import org.apache.ignite.jdbc.thin.JdbcThinAffinityAwarenessSelfTest;
import org.apache.ignite.jdbc.thin.JdbcThinAffinityAwarenessTransactionsSelfTest;
import org.apache.ignite.jdbc.thin.JdbcThinConnectionSelfTest;
@@ -38,6 +39,7 @@
JdbcThinStatementSelfTest.class,
JdbcThinAffinityAwarenessSelfTest.class,
JdbcThinAffinityAwarenessTransactionsSelfTest.class,
+ JdbcThinAffinityAwarenessReconnectionSelfTest.class,
})
public class IgniteJdbcThinDriverAffinityAwarenessTestSuite {
/**
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAffinityAwarenessReconnectionSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAffinityAwarenessReconnectionSelfTest.java
new file mode 100644
index 0000000..f612b5b
--- /dev/null
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAffinityAwarenessReconnectionSelfTest.java
@@ -0,0 +1,397 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.thin;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.logging.Handler;
+import java.util.logging.Level;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.jdbc.thin.JdbcThinConnection;
+import org.apache.ignite.internal.jdbc.thin.JdbcThinTcpIo;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+/**
+ * Jdbc thin affinity awareness reconnection test.
+ */
+public class JdbcThinAffinityAwarenessReconnectionSelfTest extends JdbcThinAbstractSelfTest {
+ /** URL. */
+ private static final String URL = "jdbc:ignite:thin://127.0.0.1:10800..10802?affinityAwareness=true";
+
+ /** Nodes count. */
+ private static final int INITIAL_NODES_CNT = 3;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGridsMultiThreaded(INITIAL_NODES_CNT);
+ }
+
+ /**
+ * Check that background connection establishment works as expected.
+ * <p>
+ * Within new reconnection logic in affinity awareness mode when {@code JdbcThinConnection} is created
+ * it eagerly establishes a connection to one and only one ignite node. All other connections to nodes specified in
+ * connection properties are established by background thread.
+ * <p>
+ * So in given test we specify url with 3 different ports and verify that 3 connections will be created:
+ * one in eager mode and two within background thread. It takes some time for background thread to create
+ * a connection, and cause, in addition to that it runs periodically with some delay,
+ * {@code GridTestUtils.waitForCondition} is used in order to check that all expected connections are established.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testBackgroundConnectionEstablishment() throws Exception {
+ try (Connection conn = DriverManager.getConnection(URL)) {
+ Map<UUID, JdbcThinTcpIo> ios = GridTestUtils.getFieldValue(conn, "ios");
+
+ assertConnectionsCount(ios, 3);
+ }
+ }
+
+ /**
+ * Test connection failover:
+ * <ol>
+ * <li>Check that at the beginning of test {@code INITIAL_NODES_CNT} connections are established.</li>
+ * <li>Stop one node, invalidate dead connection (jdbc thin, won't detect that node has gone,
+ * until it tries to touch it) and verify, that connections count has decremented. </li>
+ * <li>Start, previously stopped node, and check that connections count also restored to initial value.</li>
+ * </ol>
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testConnectionFailover() throws Exception {
+ try (Connection conn = DriverManager.getConnection(URL)) {
+ Map<UUID, JdbcThinTcpIo> ios = GridTestUtils.getFieldValue(conn, "ios");
+
+ assertConnectionsCount(ios, INITIAL_NODES_CNT);
+
+ assertEquals("Unexpected connections count.", INITIAL_NODES_CNT, ios.size());
+
+ stopGrid(1);
+
+ invalidateConnectionToStoppedNode(conn);
+
+ assertEquals("Unexpected connections count.", INITIAL_NODES_CNT - 1, ios.size());
+
+ startGrid(1);
+
+ assertConnectionsCount(ios, INITIAL_NODES_CNT);
+ }
+ }
+
+ /**
+ * Test total connection failover:
+ * <ol>
+ * <li>Check that at the beginning of test {@code INITIAL_NODES_CNT} connections are established.</li>
+ * <li>Stop all nodes, invalidate dead connections (jdbc thin, won't detect that node has gone,
+ * until it tries to touch it) and verify, that connections count equals to zero. </li>
+ * <li>Start, previously stopped nodes, and check that connections count also restored to initial value.</li>
+ * </ol>
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testTotalConnectionFailover() throws Exception {
+ try(Connection conn = DriverManager.getConnection(URL)) {
+ Map<UUID, JdbcThinTcpIo> ios = GridTestUtils.getFieldValue(conn, "ios");
+
+ assertConnectionsCount(ios, INITIAL_NODES_CNT);
+
+ for (int i = 0; i < INITIAL_NODES_CNT; i++) {
+ stopGrid(i);
+ invalidateConnectionToStoppedNode(conn);
+ }
+
+ assertConnectionsCount(ios, 0);
+
+ for (int i = 0; i < INITIAL_NODES_CNT; i++)
+ startGrid(i);
+
+ assertConnectionsCount(ios, INITIAL_NODES_CNT);
+ }
+ }
+
+ /**
+ * Test eager connection failover:
+ * <ol>
+ * <li>Check that at the beginning of test {@code INITIAL_NODES_CNT} connections are established.</li>
+ * <li>Stop all nodes, invalidate dead connections (jdbc thin, won't detect that node has gone,
+ * until it tries to touch it) and verify, that connections count equals to zero. </li>
+ * <li>Wait for some time, in order for reconnection thread to increase delay between connection attempts,
+ * because of reconnection failures.</li>
+ * <li>Start, previously stopped nodes, and send simple query immediately. Eager reconnection is expected.
+ * <b>NOTE</b>:There's still a chance that connection would be recreated by background thread and not eager process.
+ * In order to decrease given possibility we've waited for some time on previous step.</li>
+ * <li>Ensure that after some time all connections will be restored.</li>
+ * </ol>
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testEagerConnectionFailover() throws Exception {
+ try(Connection conn = DriverManager.getConnection(URL)) {
+ Map<UUID, JdbcThinTcpIo> ios = GridTestUtils.getFieldValue(conn, "ios");
+
+ assertConnectionsCount(ios, INITIAL_NODES_CNT);
+
+ for (int i = 0; i < INITIAL_NODES_CNT; i++) {
+ stopGrid(i);
+ invalidateConnectionToStoppedNode(conn);
+ }
+
+ assertEquals("Unexpected connections count.", 0, ios.size());
+
+ doSleep(4 * JdbcThinConnection.RECONNECTION_DELAY);
+
+ for (int i = 0; i < INITIAL_NODES_CNT; i++)
+ startGrid(i);
+
+ conn.createStatement().execute("select 1;");
+
+ assertConnectionsCount(ios, INITIAL_NODES_CNT);
+ }
+ }
+
+ /**
+ * Check that reconnection thread increases delay between unsuccessful connection attempts:
+ * <ol>
+ * <li>Specify two inet addresses one valid and one inoperative.</li>
+ * <li>Wait for specific amount of time. The reconnection logic suppose to increase delays between reconnection
+ * attempts. The basic idea is very simple: delay is doubled on evey connection failure until connection succeeds
+ * or until delay exceeds predefined maximum value {@code JdbcThinConnection.RECONNECTION_MAX_DELAY}
+ * <pre>
+ * |_|_ _|_ _ _ _|_ _ _ _ _ _ _ _|
+ * where: '|' is connection attempt;
+ * '_' is an amount of time that reconnection tread waits, equal to JdbcThinConnection.RECONNECTION_DELAY;
+ *
+ * so if we wait for 9 * RECONNECTION_DELAY, we expect to see exact four connection attempts:
+ * |_|_ _|_ _ _ _|_ _^_ _ _ _ _ _|
+ * </pre>
+ * </li>
+ * <li>Check that there were exact four reconnection attempts. In order to do this, we check logs, expecting to see
+ * four warning messages there.</li>
+ * </ol>
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testReconnectionDelayIncreasing() throws Exception {
+ Logger log = Logger.getLogger(JdbcThinConnection.class.getName());
+ LogHandler hnd = new LogHandler();
+ hnd.setLevel(Level.ALL);
+ log.setUseParentHandlers(false);
+ log.addHandler(hnd);
+ log.setLevel(Level.ALL);
+
+ try (Connection ignored = DriverManager.getConnection(
+ "jdbc:ignite:thin://127.0.0.1:10800,127.0.0.1:10810?affinityAwareness=true")) {
+ hnd.records.clear();
+
+ doSleep(9 * JdbcThinConnection.RECONNECTION_DELAY);
+
+ assertEquals("Unexpected log records count.", 4, hnd.records.size());
+
+ String expRecordMsg = "Failed to connect to Ignite node " +
+ "[url=jdbc:ignite:thin://127.0.0.1:10800,127.0.0.1:10810]. address = [localhost/127.0.0.1:10810].";
+
+ for (LogRecord record: hnd.records) {
+ assertEquals("Unexpected log record text.", expRecordMsg, record.getMessage());
+ assertEquals("Unexpected log level", Level.WARNING, record.getLevel());
+ }
+ }
+ }
+
+ /**
+ * Check that reconnection thread selectively increases delay between unsuccessful connection attempts:
+ * <ol>
+ * <li>Create {@code JdbcThinConnection} with two valid inet addresses.</li>
+ * <li>Stop one node and invalidate corresponding connection. Ensure that only one connection left.</li>
+ * <li>Wait for specific amount of time. The reconnection logic suppose to increase delays between reconnection
+ * attempts. The basic idea is very simple: delay is doubled on evey connection failure until connection succeeds
+ * or until delay exceeds predefined maximum value {@code JdbcThinConnection.RECONNECTION_MAX_DELAY}
+ * <pre>
+ * |_|_ _|_ _ _ _|_ _ _ _ _ _ _ _|
+ * where: '|' is connection attempt;
+ * '_' is an amount of time that reconnection tread waits, equal to JdbcThinConnection.RECONNECTION_DELAY;
+ *
+ * so if we wait for 9 * RECONNECTION_DELAY, we expect to see exact four connection attempts:
+ * |_|_ _|_ _ _ _|_ _^_ _ _ _ _ _|
+ * </pre>
+ * </li>
+ * <li>Check that there were exact four reconnection attempts. In order to do this, we check logs, expecting to see
+ * four warning messages there.</li>
+ * <li>Start previously stopped node.</li>
+ * <li>Wait until next reconnection attempt.</li>
+ * <li>Check that both connections are established and that there are no warning messages within logs.</li>
+ * <li>One more time: stop one node and invalidate corresponding connection.
+ * Ensure that only one connection left.</li>
+ * <li>Wait for some time.</li>
+ * <li>Ensure that delay between reconnection was reset to initial value.
+ * In other words, we again expect four warning messages within logs.</li>
+ * </ol>
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testReconnectionDelaySelectiveIncreasing() throws Exception {
+ Logger log = Logger.getLogger(JdbcThinConnection.class.getName());
+ LogHandler hnd = new LogHandler();
+ hnd.setLevel(Level.ALL);
+ log.setUseParentHandlers(false);
+ log.addHandler(hnd);
+ log.setLevel(Level.ALL);
+
+ try (Connection conn = DriverManager.getConnection(
+ "jdbc:ignite:thin://127.0.0.1:10800..10801?affinityAwareness=true")) {
+ // Stop one node and invalidate corresponding connection. Ensure that only one connection left.
+ stopGrid(0);
+
+ invalidateConnectionToStoppedNode(conn);
+
+ Map<UUID, JdbcThinTcpIo> ios = GridTestUtils.getFieldValue(conn, "ios");
+
+ assertEquals("Unexpected connections count.", 1, ios.size());
+
+ hnd.records.clear();
+
+ // Wait for some specific amount of time and ensure that there were exact four reconnection attempts.
+ doSleep(9 * JdbcThinConnection.RECONNECTION_DELAY);
+
+ assertEquals("Unexpected log records count.", 4, hnd.records.size());
+
+ String expRecordMsg = "Failed to connect to Ignite node [url=jdbc:ignite:thin://127.0.0.1:10800..10801]." +
+ " address = [localhost/127.0.0.1:10800].";
+
+ for (LogRecord record: hnd.records) {
+ assertEquals("Unexpected log record text.", expRecordMsg, record.getMessage());
+ assertEquals("Unexpected log level", Level.WARNING, record.getLevel());
+ }
+
+ // Start previously stopped node.
+ startGrid(0);
+
+ hnd.records.clear();
+
+ // Waiting until next reconnection attempt.
+ doSleep(9 * JdbcThinConnection.RECONNECTION_DELAY);
+
+ // Checking that both connections are established and that there are no warning messages within logs.
+ assertEquals("Unexpected log records count.", 0, hnd.records.size());
+
+ assertEquals("Unexpected connections count.", 2, ios.size());
+
+ // One more time: stop one node, invalidate corresponding connection and ensure that only one connection
+ // left.
+ stopGrid(0);
+
+ invalidateConnectionToStoppedNode(conn);
+
+ assertEquals("Unexpected connections count.", 1, ios.size());
+
+ hnd.records.clear();
+
+ // Wait for some time and ensure that delay between reconnection was reset to initial value.
+ doSleep(9 * JdbcThinConnection.RECONNECTION_DELAY);
+
+ assertEquals("Unexpected log records count.", 4, hnd.records.size());
+
+ for (LogRecord record: hnd.records) {
+ assertEquals("Unexpected log record text.", expRecordMsg, record.getMessage());
+ assertEquals("Unexpected log level", Level.WARNING, record.getLevel());
+ }
+
+ startGrid(0);
+ }
+ }
+
+ /**
+ * Assert connections count.
+ *
+ * @param ios Map that holds connections.
+ * @param expConnCnt Expected connections count.
+ */
+ private void assertConnectionsCount(Map<UUID, JdbcThinTcpIo> ios, int expConnCnt)
+ throws IgniteInterruptedCheckedException {
+ boolean allConnectionsEstablished = GridTestUtils.waitForCondition(() -> ios.size() == expConnCnt,
+ 10_000);
+
+ assertTrue("Unexpected connections count.", allConnectionsEstablished);
+ }
+
+ /**
+ * Invalidate connection to stopped node. Jdbc thin, won't detect that node has gone, until it tries to touch it.
+ * So sending simple query to randomly chosen connection(socket), sooner or later, will touch dead one,
+ * and thus invalidate it.
+ *
+ * @param conn Connections.
+ */
+ private void invalidateConnectionToStoppedNode(Connection conn) {
+ while (true) {
+ try (Statement stmt = conn.createStatement()) {
+ stmt.execute("select 1");
+ }
+ catch (SQLException e) {
+ return;
+ }
+ }
+ }
+
+ /**
+ * Simple {@code java.util.logging.Handler} implementation in order to check log records
+ * generated by {@code JdbcThinConnection}.
+ */
+ static class LogHandler extends Handler {
+
+ /** Log records. */
+ private final List<LogRecord> records = new ArrayList<>();
+
+ /** {@inheritDoc} */
+ @Override public void publish(LogRecord record) {
+ records.add(record);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close() {
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void flush() {
+ }
+
+ /**
+ * @return Records.
+ */
+ @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType") public List<LogRecord> records() {
+ return records;
+ }
+ }
+}
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAffinityAwarenessSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAffinityAwarenessSelfTest.java
index 22e5d0d..f0e632c 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAffinityAwarenessSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAffinityAwarenessSelfTest.java
@@ -30,7 +30,6 @@
import java.util.List;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.affinity.AffinityFunction;
@@ -133,14 +132,14 @@
*/
@Test
public void testExecuteQueries() throws Exception {
- checkNodesUsage(null, "select * from Person where _key = 1", 1, 1,
+ checkNodesUsage(null, stmt, "select * from Person where _key = 1", 1, 1,
false);
- checkNodesUsage(null, "select * from Person where _key = 1 or _key = 2", 2,
+ checkNodesUsage(null, stmt, "select * from Person where _key = 1 or _key = 2", 2,
2, false);
- checkNodesUsage(null, "select * from Person where _key in (1, 2)", 2, 2,
- false);
+ checkNodesUsage(null, stmt, "select * from Person where _key in (1, 2)", 2,
+ 2, false);
}
/**
@@ -155,7 +154,7 @@
ps.setInt(1, 2);
- checkNodesUsage(ps, null, 1, 1, false);
+ checkNodesUsage(ps, null, null, 1, 1, false);
// Use case 2.
ps = conn.prepareStatement("select * from Person where _key = ? or _key = ?");
@@ -164,7 +163,7 @@
ps.setInt(2, 2);
- checkNodesUsage(ps, null, 2, 2, false);
+ checkNodesUsage(ps, null, null, 2, 2, false);
// Use case 3.
ps = conn.prepareStatement("select * from Person where _key in (?, ?)");
@@ -173,7 +172,7 @@
ps.setInt(2, 2);
- checkNodesUsage(ps, null, 2, 2, false);
+ checkNodesUsage(ps, null, null, 2, 2, false);
}
/**
@@ -183,13 +182,13 @@
*/
@Test
public void testUpdateQueries() throws Exception {
- checkNodesUsage(null, "update Person set firstName = 'TestFirstName' where _key = 1",
+ checkNodesUsage(null, stmt, "update Person set firstName = 'TestFirstName' where _key = 1",
1, 1, true);
- checkNodesUsage(null, "update Person set firstName = 'TestFirstName' where _key = 1 or _key = 2",
+ checkNodesUsage(null, stmt, "update Person set firstName = 'TestFirstName' where _key = 1 or _key = 2",
2, 2, true);
- checkNodesUsage(null, "update Person set firstName = 'TestFirstName' where _key in (1, 2)",
+ checkNodesUsage(null, stmt, "update Person set firstName = 'TestFirstName' where _key in (1, 2)",
2, 2, true);
}
@@ -206,7 +205,7 @@
ps.setInt(1, 2);
- checkNodesUsage(ps, null, 1, 1, true);
+ checkNodesUsage(ps, null, null, 1, 1, true);
// Use case 2.
ps = conn.prepareStatement("update Person set firstName = 'TestFirstName' where _key = ? or _key = ?");
@@ -215,7 +214,7 @@
ps.setInt(2, 2);
- checkNodesUsage(ps, null, 2, 2, true);
+ checkNodesUsage(ps, null, null, 2, 2, true);
// Use case 3.
ps = conn.prepareStatement("update Person set firstName = 'TestFirstName' where _key in (?, ?)");
@@ -224,7 +223,7 @@
ps.setInt(2, 2);
- checkNodesUsage(ps, null, 2, 2, true);
+ checkNodesUsage(ps, null, null, 2, 2, true);
}
/**
@@ -235,12 +234,12 @@
@Test
public void testDeleteQueries() throws Exception {
// In case of simple query like "delete from Person where _key = 1" fast update logic is used,
- // so parition result is not calculated on the server side - nothing to check.
+ // so partition result is not calculated on the server side - nothing to check.
- checkNodesUsage(null, "delete from Person where _key = 10000 or _key = 20000",
+ checkNodesUsage(null, stmt, "delete from Person where _key = 10000 or _key = 20000",
2, 0, true);
- checkNodesUsage(null, "delete from Person where _key in (10000, 20000)",
+ checkNodesUsage(null, stmt, "delete from Person where _key in (10000, 20000)",
2, 0, true);
}
@@ -252,7 +251,7 @@
@Test
public void testDeleteParametrizedQueries() throws Exception {
// In case of simple query like "delete from Person where _key = ?" fast update logic is used,
- // so parition result is not calculated on the server side - nothing to check.
+ // so partition result is not calculated on the server side - nothing to check.
// Use case 1.
PreparedStatement ps = conn.prepareStatement("delete from Person where _key = ? or _key = ?");
@@ -261,7 +260,7 @@
ps.setInt(2, 2000);
- checkNodesUsage(ps, null, 2, 0, true);
+ checkNodesUsage(ps, null, null, 2, 0, true);
// Use case 2.
ps = conn.prepareStatement("delete from Person where _key in (?, ?)");
@@ -270,7 +269,7 @@
ps.setInt(2, 2000);
- checkNodesUsage(ps, null, 2, 0, true);
+ checkNodesUsage(ps, null, null, 2, 0, true);
}
/**
@@ -352,14 +351,14 @@
fillCache(cacheName);
- checkNodesUsage(null,
+ checkNodesUsage(null, stmt,
"select * from \"" + cacheName + "\".Person where _key = 1",
1, 1, false);
}
/**
* Check that affinity cache is invalidated in case of changing topology,
- * detected during partions destribution retrieval.
+ * detected during partitions distribution retrieval.
*
* @throws Exception If failed.
*/
@@ -483,7 +482,7 @@
* @throws Exception If failed.
*/
@Test
- public void testAffinityCacheStoresSchemaBindedQuries() throws Exception {
+ public void testAffinityCacheStoresSchemaBindedQueries() throws Exception {
final String cacheName = "yacc";
CacheConfiguration<Object, Object> cache = prepareCacheConfig(cacheName);
@@ -515,12 +514,12 @@
}
/**
- * Check that affinity cache stores compacted version of partitoins destributions.
+ * Check that affinity cache stores compacted version of partitions distributions.
*
* @throws Exception If failed.
*/
@Test
- public void testAffinityCacheCompactsPartitonDestributions() throws Exception {
+ public void testAffinityCacheCompactsPartitionDistributions() throws Exception {
final String cacheName = "yaccc";
CacheConfiguration<Object, Object> cache = prepareCacheConfig(cacheName);
@@ -546,56 +545,16 @@
assertEquals("Sql sub-cache of affinity cache has unexpected number of elements.",
2, sqlCache.size());
- assertEquals("Partitions destribution sub-cache of affinity cache has unexpected number of elements.",
+ assertEquals("Partitions distribution sub-cache of affinity cache has unexpected number of elements.",
2, cachePartitionsDistribution.size());
- // Main assertition of the test: we are checking that partitions destributions for different caches
+ // Main assertion of the test: we are checking that partitions distributions for different caches
// are equal in therms of (==)
assertTrue("Partitions distributions are not the same.",
cachePartitionsDistribution.get(0) == cachePartitionsDistribution.get(1));
}
/**
- * Check that affinity awareness works fine after reconnection.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testReconnect() throws Exception {
- checkNodesUsage(null, "select * from Person where _key = 3", 1, 1,
- false);
-
- startGrid(7);
-
- for(int i = 0; i < NODES_CNT; i++)
- stopGrid(i);
-
- GridTestUtils.assertThrows(log, new Callable<Object>() {
- @Override public Object call() throws Exception {
- stmt.execute("select * from Person where _key = 3");
-
- return null;
- }
- }, SQLException.class, "Failed to communicate with Ignite cluster.");
-
- for(int i = 0; i < NODES_CNT; i++)
- startGrid(i);
-
- stopGrid(4);
- stopGrid(5);
- stopGrid(6);
- stopGrid(7);
-
- stmt = conn.createStatement();
-
- // We need this extra query to invalidate obsolete affinity cache
- stmt.execute("select * from Person where _key = 3");
-
- checkNodesUsage(null, "select * from Person where _key = 3", 1, 1,
- false);
- }
-
- /**
* Prepares default cache configuration with given name.
*
* @param cacheName Cache name.
@@ -607,6 +566,7 @@
cache.setName(cacheName);
cache.setCacheMode(PARTITIONED);
+ cache.setBackups(1);
cache.setIndexedTypes(
Integer.class, Person.class
);
@@ -615,8 +575,8 @@
}
/**
- * Utitlity method that executes given query and verifies that expeted number of records was returned.
- * Besides that given method verified that partitoin result for corresponding query is null.
+ * Utility method that executes given query and verifies that expected number of records was returned.
+ * Besides that given method verified that partition result for corresponding query is null.
*
* @param sqlQry Sql query.
* @param expRowsCnt Expected rows count.
@@ -656,8 +616,8 @@
* @param dml Flag that signals whether we execute dml or not.
* @throws Exception If failed.
*/
- private void checkNodesUsage(PreparedStatement ps, String sql, int maxNodesUsedCnt, int expRowsCnt, boolean dml)
- throws Exception {
+ private void checkNodesUsage(PreparedStatement ps, Statement stmt, String sql, int maxNodesUsedCnt, int expRowsCnt,
+ boolean dml) throws Exception {
// Warm up an affinity cache.
if (ps != null)
if (dml)
@@ -729,7 +689,7 @@
"], got [" + nonEmptyMetricsCntr + "]",
nonEmptyMetricsCntr > 0 && nonEmptyMetricsCntr <= maxNodesUsedCnt);
- assertEquals("Executions count doesn't match expeted value: expected [" +
+ assertEquals("Executions count doesn't match expected value: expected [" +
NODES_CNT * QUERY_EXECUTION_MULTIPLIER + "], got [" + qryExecutionsCntr + "]",
NODES_CNT * QUERY_EXECUTION_MULTIPLIER, qryExecutionsCntr);
}
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
index 243f5c4..185fa91 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
@@ -286,8 +286,9 @@
*/
@Test
public void testSqlHints() throws Exception {
- try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1")) {
- assertHints(conn, false, false, false, false, false, false, affinityAwareness);
+ try (Connection conn = DriverManager.getConnection(urlWithAffinityAwarenessFlag)) {
+ assertHints(conn, false, false, false, false, false,
+ false, affinityAwareness);
}
try (Connection conn = DriverManager.getConnection(urlWithAffinityAwarenessFlag + "&distributedJoins=true")) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/AffinityCache.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/AffinityCache.java
index d582ede..bd4dc4b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/AffinityCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/AffinityCache.java
@@ -107,7 +107,7 @@
* @param cacheId Cache Id.
* @return Cache partitoins distribution for given cache Id or null.
*/
- UUID[] cacheDistribution(int cacheId) {
+ public UUID[] cacheDistribution(int cacheId) {
return cachePartitionsDistribution.get(cacheId);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
index 065bac9..971acdf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
@@ -43,22 +43,27 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.internal.jdbc2.JdbcUtils;
@@ -107,6 +112,12 @@
/** Request timeout period. */
private static final int REQUEST_TIMEOUT_PERIOD = 1_000;
+ /** Reconnection period. */
+ public static final int RECONNECTION_DELAY = 200;
+
+ /** Reconnection maximum period. */
+ private static final int RECONNECTION_MAX_DELAY = 300_000;
+
/** Network timeout permission */
private static final String SET_NETWORK_TIMEOUT_PERM = "setNetworkTimeout";
@@ -149,15 +160,12 @@
/** Connection properties. */
private final ConnectionProperties connProps;
- /** Connected. */
- private volatile boolean connected;
+ /** The amount of potentially alive {@code JdbcThinTcpIo} instances - connections to server nodes. */
+ private final AtomicInteger connCnt = new AtomicInteger();
/** Tracked statements to close on disconnect. */
private final Set<JdbcThinStatement> stmts = Collections.newSetFromMap(new IdentityHashMap<>());
- /** Query timeout timer */
- private final Timer timer;
-
/** Affinity cache. */
private AffinityCache affinityCache;
@@ -165,10 +173,7 @@
private volatile JdbcThinTcpIo singleIo;
/** Node Ids tp ignite endpoints. */
- private final Map<UUID, JdbcThinTcpIo> ios = new ConcurrentHashMap<>();
-
- /** Ignite endpoints to use for better performance in case of random access. */
- private JdbcThinTcpIo[] iosArr;
+ private final ConcurrentSkipListMap<UUID, JdbcThinTcpIo> ios = new ConcurrentSkipListMap<>();
/** Server index. */
private int srvIdx;
@@ -188,6 +193,15 @@
/** Network timeout. */
private int netTimeout;
+ /** Background periodical maintenance: query timeouts and reconnection handler. */
+ private final ScheduledExecutorService maintenanceExecutor = Executors.newScheduledThreadPool(2);
+
+ /** Cancelable future for query timeout task. */
+ private ScheduledFuture<?> qryTimeoutScheduledFut;
+
+ /** Cancelable future for connections handler task. */
+ private ScheduledFuture<?> connectionsHndScheduledFut;
+
/**
* Creates new connection.
*
@@ -203,32 +217,30 @@
schema = JdbcUtils.normalizeSchema(connProps.getSchema());
- timer = new Timer("query-timeout-timer");
-
affinityAwareness = connProps.isAffinityAwareness();
ensureConnected();
+
+ if (affinityAwareness)
+ connectionsHndScheduledFut = maintenanceExecutor.scheduleWithFixedDelay(new ConnectionHandlerTask(),
+ 0, RECONNECTION_DELAY, TimeUnit.MILLISECONDS);
}
/**
* @throws SQLException On connection error.
*/
private void ensureConnected() throws SQLException {
- if (connected)
+ if (connCnt.get() > 0)
return;
assert !closed;
assert ios.isEmpty();
- assert iosArr == null;
-
- HostAndPortRange[] srvs = connProps.getAddresses();
-
if (affinityAwareness)
- connectInAffinityAwarenessMode(srvs);
+ connectInBestEffortAffinityMode();
else
- connectInCommonMode(srvs);
+ connectInCommonMode();
}
/**
@@ -445,6 +457,10 @@
if (isClosed())
return;
+ closed = true;
+
+ maintenanceExecutor.shutdown();
+
if (streamState != null) {
streamState.close();
@@ -457,23 +473,17 @@
SQLException err = null;
- closed = true;
-
if (affinityAwareness) {
for (JdbcThinTcpIo clioIo : ios.values())
clioIo.close();
ios.clear();
-
- iosArr = null;
}
else {
if (singleIo != null)
singleIo.close();
}
- timer.cancel();
-
if (err != null)
throw err;
}
@@ -858,7 +868,7 @@
throws SQLException {
ensureConnected();
- RequestTimeoutTimerTask reqTimeoutTimerTask = null;
+ RequestTimeoutTask reqTimeoutTask = null;
synchronized (mux) {
if (ownThread != null) {
@@ -870,16 +880,18 @@
ownThread = Thread.currentThread();
}
try {
+ JdbcThinTcpIo cliIo = null;
try {
- JdbcThinTcpIo cliIo = stickyIo == null ? cliIo(calculateNodeIds(req)) : stickyIo;
+ cliIo = (stickyIo == null || !stickyIo.connected()) ? cliIo(calculateNodeIds(req)) : stickyIo;
if (stmt != null && stmt.requestTimeout() != NO_TIMEOUT) {
- reqTimeoutTimerTask = new RequestTimeoutTimerTask(
+ reqTimeoutTask = new RequestTimeoutTask(
req instanceof JdbcBulkLoadBatchRequest ? stmt.currentRequestId() : req.requestId(),
cliIo,
stmt.requestTimeout());
- timer.schedule(reqTimeoutTimerTask, 0, REQUEST_TIMEOUT_PERIOD);
+ qryTimeoutScheduledFut = maintenanceExecutor.scheduleAtFixedRate(reqTimeoutTask, 0,
+ REQUEST_TIMEOUT_PERIOD, TimeUnit.MILLISECONDS);
}
JdbcQueryExecuteRequest qryReq = null;
@@ -892,13 +904,15 @@
txIo = res.activeTransaction() ? cliIo : null;
if (res.status() == IgniteQueryErrorCode.QUERY_CANCELED && stmt != null &&
- stmt.requestTimeout() != NO_TIMEOUT && reqTimeoutTimerTask != null && reqTimeoutTimerTask.expired.get()) {
+ stmt.requestTimeout() != NO_TIMEOUT && reqTimeoutTask != null &&
+ reqTimeoutTask.expired.get()) {
throw new SQLTimeoutException(QueryCancelledException.ERR_MSG, SqlStateCode.QUERY_CANCELLED,
IgniteQueryErrorCode.QUERY_CANCELED);
}
else if (res.status() != ClientListenerResponse.STATUS_SUCCESS)
- throw new SQLException(res.error(), IgniteQueryErrorCode.codeToSqlState(res.status()), res.status());
+ throw new SQLException(res.error(), IgniteQueryErrorCode.codeToSqlState(res.status()),
+ res.status());
updateAffinityCache(qryReq, res);
@@ -908,16 +922,17 @@
throw e;
}
catch (Exception e) {
- onDisconnect();
+ onDisconnect(cliIo);
if (e instanceof SocketTimeoutException)
throw new SQLException("Connection timed out.", SqlStateCode.CONNECTION_FAILURE, e);
else
- throw new SQLException("Failed to communicate with Ignite cluster.", SqlStateCode.CONNECTION_FAILURE, e);
+ throw new SQLException("Failed to communicate with Ignite cluster.",
+ SqlStateCode.CONNECTION_FAILURE, e);
}
finally {
- if (stmt != null && stmt.requestTimeout() != NO_TIMEOUT && reqTimeoutTimerTask != null)
- reqTimeoutTimerTask.cancel();
+ if (stmt != null && stmt.requestTimeout() != NO_TIMEOUT && reqTimeoutTask != null)
+ qryTimeoutScheduledFut.cancel(false);
}
}
finally {
@@ -932,7 +947,7 @@
*
* @param req Jdbc request for which we'll try to calculate node id.
* @return node UUID or null if failed to calculate.
- * @throws IOException If Exception occured during the network partiton destribution retrieval.
+ * @throws IOException If Exception occurred during the network partition distribution retrieval.
* @throws SQLException If Failed to calculate derived partitions.
*/
@Nullable private List<UUID> calculateNodeIds(JdbcRequest req) throws IOException, SQLException {
@@ -982,12 +997,12 @@
}
/**
- * Retrieve cache destribution for specified cache Id.
+ * Retrieve cache distribution for specified cache Id.
*
* @param cacheId Cache Id.
- * @param partCnt Partitons count.
+ * @param partCnt Partitions count.
* @return Partitions cache distribution.
- * @throws IOException If Exception occured during the network partiton destribution retrieval.
+ * @throws IOException If Exception occurred during the network partition distribution retrieval.
*/
private UUID[] retrieveCacheDistribution(int cacheId, int partCnt) throws IOException {
UUID[] cacheDistr = affinityCache.cacheDistribution(cacheId);
@@ -997,7 +1012,8 @@
JdbcResponse res;
- res = cliIo(null).sendRequest(new JdbcCachePartitionsRequest(Collections.singleton(cacheId)), null);
+ res = cliIo(null).sendRequest(new JdbcCachePartitionsRequest(Collections.singleton(cacheId)),
+ null);
assert res.status() == ClientListenerResponse.STATUS_SUCCESS;
@@ -1007,7 +1023,7 @@
affinityCache = new AffinityCache(resAffinityVer);
else if (affinityCache.version().compareTo(resAffinityVer) > 0) {
// Jdbc thin affinity cache is binded to the newer affinity topology version, so we should ignore retrieved
- // partition destribution. Given situation might occur in case of concurrent race and is not
+ // partition distribution. Given situation might occur in case of concurrent race and is not
// possible in single-threaded jdbc thin client, so it's a reserve for the future.
return null;
}
@@ -1015,7 +1031,7 @@
List<JdbcThinAffinityAwarenessMappingGroup> mappings =
((JdbcCachePartitionsResult)res.response()).getMappings();
- // Despite the fact that, at this moment, we request partition destribution only for one cache,
+ // Despite the fact that, at this moment, we request partition distribution only for one cache,
// we might retrieve multiple caches but exactly with same distribution.
assert mappings.size() == 1;
@@ -1046,7 +1062,8 @@
return derivedParts.tree().apply(partResDesc.partitionClientContext(), args);
}
catch (IgniteCheckedException e) {
- throw new SQLException("Failed to calculate derived partitions for query.", SqlStateCode.INTERNAL_ERROR);
+ throw new SQLException("Failed to calculate derived partitions for query.",
+ SqlStateCode.INTERNAL_ERROR);
}
}
@@ -1061,7 +1078,7 @@
* @throws SQLException On any error.
*/
void sendQueryCancelRequest(JdbcQueryCancelRequest req, JdbcThinTcpIo cliIo) throws SQLException {
- if (!connected)
+ if (connCnt.get() == 0)
throw new SQLException("Failed to communicate with Ignite cluster.", SqlStateCode.CONNECTION_FAILURE);
assert cliIo != null;
@@ -1082,7 +1099,8 @@
* @param stickyIO Sticky ignite endpoint.
* @throws SQLException On any error.
*/
- private void sendRequestNotWaitResponse(JdbcOrderedBatchExecuteRequest req, JdbcThinTcpIo stickyIO) throws SQLException {
+ private void sendRequestNotWaitResponse(JdbcOrderedBatchExecuteRequest req, JdbcThinTcpIo stickyIO)
+ throws SQLException {
ensureConnected();
synchronized (mux) {
@@ -1102,12 +1120,13 @@
throw e;
}
catch (Exception e) {
- onDisconnect();
+ onDisconnect(stickyIO);
if (e instanceof SocketTimeoutException)
throw new SQLException("Connection timed out.", SqlStateCode.CONNECTION_FAILURE, e);
else
- throw new SQLException("Failed to communicate with Ignite cluster.", SqlStateCode.CONNECTION_FAILURE, e);
+ throw new SQLException("Failed to communicate with Ignite cluster.",
+ SqlStateCode.CONNECTION_FAILURE, e);
}
finally {
synchronized (mux) {
@@ -1126,24 +1145,20 @@
/**
* Called on IO disconnect: close the client IO and opened statements.
*/
- private void onDisconnect() {
- if (!connected)
- return;
+ private void onDisconnect(JdbcThinTcpIo cliIo) {
+ assert connCnt.get() > 0;
if (affinityAwareness) {
- for (JdbcThinTcpIo clioIo : ios.values())
- clioIo.close();
+ cliIo.close();
- ios.clear();
-
- iosArr = null;
+ ios.remove(cliIo.nodeId());
}
else {
if (singleIo != null)
singleIo.close();
}
- connected = false;
+ connCnt.decrementAndGet();
if (streamState != null) {
streamState.close0();
@@ -1157,8 +1172,6 @@
stmts.clear();
}
-
- timer.cancel();
}
/**
@@ -1308,7 +1321,7 @@
if (err0 instanceof SQLException)
throw (SQLException)err0;
else {
- onDisconnect();
+ onDisconnect(streamingStickyIo);
if (err0 instanceof SocketTimeoutException)
throw new SQLException("Connection timed out.", SqlStateCode.CONNECTION_FAILURE, err0);
@@ -1330,7 +1343,7 @@
/**
*/
void close0() {
- if (connected) {
+ if (connCnt.get() > 0) {
try {
executeBatch(true);
}
@@ -1395,7 +1408,6 @@
* @param nodeIds Set of node's UUIDs.
* @return Ignite endpoint to use for request/response transferring.
*/
- @SuppressWarnings("ZeroLengthArrayAllocation")
private JdbcThinTcpIo cliIo(List<UUID> nodeIds) {
if (!affinityAwareness)
return singleIo;
@@ -1404,12 +1416,12 @@
return txIo;
if (nodeIds == null || nodeIds.isEmpty())
- return iosArr[RND.nextInt(iosArr.length)];
+ return randomIo();
JdbcThinTcpIo io = null;
if (nodeIds.size() == 1)
- io = ios.get(nodeIds.iterator().next());
+ io = ios.get(nodeIds.get(0));
else {
int initNodeId = RND.nextInt(nodeIds.size());
@@ -1427,7 +1439,42 @@
}
}
- return io != null ? io : iosArr[RND.nextInt(iosArr.length)];
+ return io != null ? io : randomIo();
+ }
+
+ /**
+ * Returns random tcpIo, based on random UUID, generated in a custom way with the help of {@code Random}
+ * instead of {@code SecureRandom}. It's valid, cause cryptographically strong pseudo
+ * random number generator is not required in this particular case. {@code Random} is much faster
+ * than {@code SecureRandom}.
+ *
+ * @return random tcpIo
+ */
+ private JdbcThinTcpIo randomIo() {
+ byte[] randomBytes = new byte[16];
+
+ RND.nextBytes(randomBytes);
+
+ randomBytes[6] &= 0x0f; /* clear version */
+ randomBytes[6] |= 0x40; /* set to version 4 */
+ randomBytes[8] &= 0x3f; /* clear variant */
+ randomBytes[8] |= 0x80; /* set to IETF variant */
+
+ long msb = 0;
+
+ long lsb = 0;
+
+ for (int i=0; i<8; i++)
+ msb = (msb << 8) | (randomBytes[i] & 0xff);
+
+ for (int i=8; i<16; i++)
+ lsb = (lsb << 8) | (randomBytes[i] & 0xff);
+
+ UUID randomUUID = new UUID(msb, lsb);
+
+ Map.Entry<UUID, JdbcThinTcpIo> entry = ios.ceilingEntry(randomUUID);
+
+ return entry != null ? entry.getValue() : ios.floorEntry(randomUUID).getValue();
}
/**
@@ -1457,10 +1504,11 @@
* Establishes a connection to ignite endpoint, trying all specified hosts and ports one by one.
* Stops as soon as any connection is established.
*
- * @param srvs Ignite endpoints addresses.
* @throws SQLException If failed to connect to ignite cluster.
*/
- private void connectInCommonMode(HostAndPortRange[] srvs) throws SQLException {
+ private void connectInCommonMode() throws SQLException {
+ HostAndPortRange[] srvs = connProps.getAddresses();
+
List<Exception> exceptions = null;
for (int i = 0; i < srvs.length; i++) {
@@ -1481,7 +1529,7 @@
singleIo = cliIo;
- connected = true;
+ connCnt.incrementAndGet();
return;
}
@@ -1513,7 +1561,7 @@
* @throws SQLException Umbrella exception.
*/
private void handleConnectExceptions(List<Exception> exceptions) throws SQLException {
- if (!connected && exceptions != null) {
+ if (connCnt.get() == 0 && exceptions != null) {
close();
if (exceptions.size() == 1) {
@@ -1540,18 +1588,16 @@
* Establishes a connection to ignite endpoint, trying all specified hosts and ports one by one.
* Stops as soon as all iosArr are established.
*
- * @param srvs Ignite endpoints addresses.
* @throws SQLException If failed to connect to at least one ignite endpoint,
* or if endpoints versions are not the same.
*/
- @SuppressWarnings("ZeroLengthArrayAllocation")
- private void connectInAffinityAwarenessMode(HostAndPortRange[] srvs) throws SQLException {
+ private void connectInBestEffortAffinityMode() throws SQLException {
List<Exception> exceptions = null;
- IgniteProductVersion prevIgniteEnpointVer = null;
+ IgniteProductVersion prevIgniteEndpointVer = null;
- for (int i = 0; i < srvs.length; i++) {
- HostAndPortRange srv = srvs[i];
+ for (int i = 0; i < connProps.getAddresses().length; i++) {
+ HostAndPortRange srv = connProps.getAddresses()[i];
try {
InetAddress[] addrs = InetAddress.getAllByName(srv.host());
@@ -1563,14 +1609,18 @@
new JdbcThinTcpIo(connProps, new InetSocketAddress(addr, port), 0);
if (!cliIo.isAffinityAwarenessSupported()) {
+ cliIo.close();
+
throw new SQLException("Failed to connect to Ignite node [url=" +
connProps.getUrl() + "]. address = [" + addr + ':' + port + "]." +
- "Node doesn't support best affort affinity mode.",
+ "Node doesn't support affinity awareness mode.",
SqlStateCode.INTERNAL_ERROR);
}
- if (prevIgniteEnpointVer != null && !prevIgniteEnpointVer.equals(cliIo.igniteVersion())) {
+ if (prevIgniteEndpointVer != null && !prevIgniteEndpointVer.equals(cliIo.igniteVersion())) {
// TODO: 13.02.19 IGNITE-11321 JDBC Thin: implement nodes multi version support.
+ cliIo.close();
+
throw new SQLException("Failed to connect to Ignite node [url=" +
connProps.getUrl() + "]. address = [" + addr + ':' + port + "]." +
"Different versions of nodes are not supported in affinity awareness mode.",
@@ -1579,17 +1629,18 @@
cliIo.timeout(netTimeout);
- JdbcThinTcpIo ioToSameNode = ios.get(cliIo.nodeId());
+ JdbcThinTcpIo ioToSameNode = ios.putIfAbsent(cliIo.nodeId(), cliIo);
- // This can happen if the same node has several IPs.
+ // This can happen if the same node has several IPs or if connection manager background
+ // timer task runs concurrently.
if (ioToSameNode != null)
- ioToSameNode.close();
+ cliIo.close();
+ else
+ connCnt.incrementAndGet();
- ios.put(cliIo.nodeId(), cliIo);
+ prevIgniteEndpointVer = cliIo.igniteVersion();
- connected = true;
-
- prevIgniteEnpointVer = cliIo.igniteVersion();
+ return;
}
catch (Exception exception) {
if (exceptions == null)
@@ -1609,60 +1660,6 @@
}
handleConnectExceptions(exceptions);
-
- iosArr = ios.values().toArray(new JdbcThinTcpIo[0]);
- }
-
- /**
- * Request Timeout Timer Task
- */
- private class RequestTimeoutTimerTask extends TimerTask {
- /** Request id. */
- private final long reqId;
-
- /** Sticky singleIo. */
- private final JdbcThinTcpIo stickyIO;
-
- /** Remaining query timeout. */
- private int remainingQryTimeout;
-
- /** Flag that shows whether TimerTask was expired or not. */
- private AtomicBoolean expired;
-
- /**
- * @param reqId Request Id to cancel in case of timeout
- * @param initReqTimeout Initial request timeout
- */
- RequestTimeoutTimerTask(long reqId, JdbcThinTcpIo stickyIO, int initReqTimeout) {
- this.reqId = reqId;
-
- this.stickyIO = stickyIO;
-
- remainingQryTimeout = initReqTimeout;
-
- expired = new AtomicBoolean(false);
- }
-
- /** {@inheritDoc} */
- @Override public void run() {
- try {
- if (remainingQryTimeout <= 0) {
- expired.set(true);
-
- sendQueryCancelRequest(new JdbcQueryCancelRequest(reqId), stickyIO);
-
- cancel();
- }
-
- remainingQryTimeout -= REQUEST_TIMEOUT_PERIOD;
- }
- catch (SQLException e) {
- LOG.log(Level.WARNING,
- "Request timeout processing failure: unable to cancel request [reqId=" + reqId + ']', e);
-
- cancel();
- }
- }
}
/**
@@ -1701,4 +1698,205 @@
}
}
}
+
+ /**
+ * Request Timeout Task
+ */
+ private class RequestTimeoutTask implements Runnable {
+ /** Request id. */
+ private final long reqId;
+
+ /** Sticky singleIo. */
+ private final JdbcThinTcpIo stickyIO;
+
+ /** Remaining query timeout. */
+ private int remainingQryTimeout;
+
+ /** Flag that shows whether TimerTask was expired or not. */
+ private AtomicBoolean expired;
+
+ /**
+ * @param reqId Request Id to cancel in case of timeout
+ * @param initReqTimeout Initial request timeout
+ */
+ RequestTimeoutTask(long reqId, JdbcThinTcpIo stickyIO, int initReqTimeout) {
+ this.reqId = reqId;
+
+ this.stickyIO = stickyIO;
+
+ remainingQryTimeout = initReqTimeout;
+
+ expired = new AtomicBoolean(false);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ try {
+ if (remainingQryTimeout <= 0) {
+ expired.set(true);
+
+ sendQueryCancelRequest(new JdbcQueryCancelRequest(reqId), stickyIO);
+
+ qryTimeoutScheduledFut.cancel(false);
+
+ return;
+ }
+
+ remainingQryTimeout -= REQUEST_TIMEOUT_PERIOD;
+ }
+ catch (SQLException e) {
+ LOG.log(Level.WARNING,
+ "Request timeout processing failure: unable to cancel request [reqId=" + reqId + ']', e);
+
+ qryTimeoutScheduledFut.cancel(false);
+ }
+ }
+ }
+
+ /**
+ * Connection Handler Task
+ */
+ private class ConnectionHandlerTask implements Runnable {
+ /** Map with reconnection delays. */
+ private Map<InetSocketAddress, Integer> reconnectionDelays = new HashMap<>();
+
+ /** Map with reconnection delays remainder. */
+ private Map<InetSocketAddress, Integer> reconnectionDelaysRemainder = new HashMap<>();
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ try {
+ for (Map.Entry<InetSocketAddress, Integer> delayEntry : reconnectionDelaysRemainder.entrySet())
+ reconnectionDelaysRemainder.put(delayEntry.getKey(), delayEntry.getValue() - RECONNECTION_DELAY);
+
+ Set<InetSocketAddress> aliveSockAddrs =
+ ios.values().stream().map(JdbcThinTcpIo::socketAddress).collect(Collectors.toSet());
+
+ IgniteProductVersion prevIgniteEndpointVer = null;
+
+ for (int i = 0; i < connProps.getAddresses().length; i++) {
+ HostAndPortRange srv = connProps.getAddresses()[i];
+
+ try {
+ InetAddress[] addrs = InetAddress.getAllByName(srv.host());
+
+ for (InetAddress addr : addrs) {
+ for (int port = srv.portFrom(); port <= srv.portTo(); ++port) {
+ InetSocketAddress sockAddr = null;
+
+ try {
+ sockAddr = new InetSocketAddress(addr, port);
+
+ if (aliveSockAddrs.contains(sockAddr)) {
+ reconnectionDelaysRemainder.remove(sockAddr);
+ reconnectionDelays.remove(sockAddr);
+
+ continue;
+ }
+
+ Integer delayRemainder = reconnectionDelaysRemainder.get(sockAddr);
+
+ if (delayRemainder != null && delayRemainder != 0)
+ continue;
+
+ if (closed) {
+ maintenanceExecutor.shutdown();
+
+ return;
+ }
+
+ JdbcThinTcpIo cliIo =
+ new JdbcThinTcpIo(connProps, new InetSocketAddress(addr, port), 0);
+
+ if (!cliIo.isAffinityAwarenessSupported()) {
+ processDelay(sockAddr);
+
+ LOG.log(Level.WARNING, "Failed to connect to Ignite node [url=" +
+ connProps.getUrl() + "]. address = [" + addr + ':' + port + "]." +
+ "Node doesn't support best effort affinity mode.");
+
+ cliIo.close();
+
+ continue;
+ }
+
+ if (prevIgniteEndpointVer != null &&
+ !prevIgniteEndpointVer.equals(cliIo.igniteVersion())) {
+ processDelay(sockAddr);
+
+ LOG.log(Level.WARNING, "Failed to connect to Ignite node [url=" +
+ connProps.getUrl() + "]. address = [" + addr + ':' + port + "]." +
+ "Different versions of nodes are not supported in best " +
+ "effort affinity mode.");
+
+ cliIo.close();
+
+ continue;
+ }
+
+ cliIo.timeout(netTimeout);
+
+ JdbcThinTcpIo ioToSameNode = ios.putIfAbsent(cliIo.nodeId(), cliIo);
+
+ // This can happen if the same node has several IPs or if ensureConnected() runs
+ // concurrently
+ if (ioToSameNode != null)
+ cliIo.close();
+ else
+ connCnt.incrementAndGet();
+
+ prevIgniteEndpointVer = cliIo.igniteVersion();
+
+ if (closed) {
+ maintenanceExecutor.shutdown();
+
+ cliIo.close();
+
+ ios.remove(cliIo.nodeId());
+
+ return;
+ }
+ }
+ catch (Exception exception) {
+ if (sockAddr != null)
+ processDelay(sockAddr);
+
+ LOG.log(Level.WARNING, "Failed to connect to Ignite node [url=" +
+ connProps.getUrl() + "]. address = [" + addr + ':' + port + "].");
+ }
+ }
+ }
+ }
+ catch (Exception exception) {
+ LOG.log(Level.WARNING, "Failed to connect to Ignite node [url=" +
+ connProps.getUrl() + "]. server = [" + srv + "].");
+ }
+ }
+ }
+ catch (Exception e) {
+ LOG.log(Level.WARNING, "Connection handler processing failure. Reconnection processes was stopped."
+ , e);
+
+ connectionsHndScheduledFut.cancel(false);
+ }
+ }
+
+ /**
+ * Increase reconnection delay if needed and store it to corresponding maps.
+ *
+ * @param sockAddr Socket address.
+ */
+ private void processDelay(InetSocketAddress sockAddr) {
+ Integer delay = reconnectionDelays.get(sockAddr);
+
+ delay = delay == null ? RECONNECTION_DELAY : delay * 2;
+
+ if (delay > RECONNECTION_MAX_DELAY)
+ delay = RECONNECTION_MAX_DELAY;
+
+ reconnectionDelays.put(sockAddr, delay);
+
+ reconnectionDelaysRemainder.put(sockAddr, delay);
+ }
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
index 366be79..7663a80 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
@@ -122,7 +122,7 @@
private final BufferedInputStream in;
/** Connected flag. */
- private boolean connected;
+ private volatile boolean connected;
/** Ignite server version. */
private final IgniteProductVersion igniteVer;
@@ -422,10 +422,9 @@
JdbcResponse resp = readResponse();
- if (stmt != null && stmt.isCancelled())
- return new JdbcResponse(IgniteQueryErrorCode.QUERY_CANCELED, QueryCancelledException.ERR_MSG);
- else
- return resp;
+ return stmt != null && stmt.isCancelled() ?
+ new JdbcResponse(IgniteQueryErrorCode.QUERY_CANCELED, QueryCancelledException.ERR_MSG) :
+ resp;
}
/**
@@ -650,4 +649,18 @@
public UUID nodeId() {
return nodeId;
}
+
+ /**
+ * @return Socket address.
+ */
+ public InetSocketAddress socketAddress() {
+ return sockAddr;
+ }
+
+ /**
+ * @return Connected flag.
+ */
+ public boolean connected() {
+ return connected;
+ }
}