[CALCITE-5009] Transparent JDBC connection re-creation may lead to data loss
CALCITE-903 has introduced a transaprent reconnection feature, which will open
a new server-side connection in case it is expired from the server side connection
cache.
While this is convinient for most read-only analytical workload, this
can cause a number a problems, including data loss for transactional connections.
This patch disables the transparent reconnect feature by default, and adds the
transparent_reconnection property, which re-enables it when set to true.
diff --git a/core/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java b/core/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
index 6a5830b..f740d85 100644
--- a/core/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
+++ b/core/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
@@ -66,6 +66,7 @@
* the number of rows modified. */
public static final String ROWCOUNT_COLUMN_NAME = "ROWCOUNT";
+ //TODO shouldn't we move this to BuiltInConnectionProperty ?
public static final String NUM_EXECUTE_RETRIES_KEY = "avatica.statement.retries";
public static final String NUM_EXECUTE_RETRIES_DEFAULT = "5";
@@ -96,6 +97,7 @@
public final Map<Integer, AvaticaStatement> statementMap = new ConcurrentHashMap<>();
final Map<Integer, AtomicBoolean> flagMap = new ConcurrentHashMap<>();
protected final long maxRetriesPerExecute;
+ protected final boolean transparentReconnectEnabled;
/**
* Creates an AvaticaConnection.
@@ -127,6 +129,7 @@
throw new RuntimeException(e);
}
this.maxRetriesPerExecute = getNumStatementRetries(info);
+ this.transparentReconnectEnabled = config().transparentReconnectionEnabled();
}
/** Computes the number of retries
@@ -792,7 +795,8 @@
return callable.call();
} catch (AvaticaClientRuntimeException e) {
lastException = e;
- if (ErrorResponse.MISSING_CONNECTION_ERROR_CODE == e.getErrorCode()) {
+ if (ErrorResponse.MISSING_CONNECTION_ERROR_CODE == e.getErrorCode()
+ && transparentReconnectEnabled) {
this.openConnection();
continue;
}
diff --git a/core/src/main/java/org/apache/calcite/avatica/BuiltInConnectionProperty.java b/core/src/main/java/org/apache/calcite/avatica/BuiltInConnectionProperty.java
index 16e1061..0313e4c 100644
--- a/core/src/main/java/org/apache/calcite/avatica/BuiltInConnectionProperty.java
+++ b/core/src/main/java/org/apache/calcite/avatica/BuiltInConnectionProperty.java
@@ -86,7 +86,9 @@
KEY_PASSWORD("key_password", Type.STRING, "", false),
HOSTNAME_VERIFICATION("hostname_verification", Type.ENUM, HostnameVerification.STRICT,
- HostnameVerification.class, false);
+ HostnameVerification.class, false),
+
+ TRANSPARENT_RECONNECTION("transparent_reconnection", Type.BOOLEAN, Boolean.FALSE, false);
private final String camelName;
private final Type type;
diff --git a/core/src/main/java/org/apache/calcite/avatica/ConnectionConfig.java b/core/src/main/java/org/apache/calcite/avatica/ConnectionConfig.java
index bbbfa87..30d0c57 100644
--- a/core/src/main/java/org/apache/calcite/avatica/ConnectionConfig.java
+++ b/core/src/main/java/org/apache/calcite/avatica/ConnectionConfig.java
@@ -62,6 +62,8 @@
String keyPassword();
/** @see BuiltInConnectionProperty#HOSTNAME_VERIFICATION */
HostnameVerification hostnameVerification();
+ /** @see BuiltInConnectionProperty#TRANSPARENT_RECONNECTION */
+ boolean transparentReconnectionEnabled();
}
// End ConnectionConfig.java
diff --git a/core/src/main/java/org/apache/calcite/avatica/ConnectionConfigImpl.java b/core/src/main/java/org/apache/calcite/avatica/ConnectionConfigImpl.java
index 36cdf61..7a2cbe4 100644
--- a/core/src/main/java/org/apache/calcite/avatica/ConnectionConfigImpl.java
+++ b/core/src/main/java/org/apache/calcite/avatica/ConnectionConfigImpl.java
@@ -128,6 +128,11 @@
.getEnum(HostnameVerification.class);
}
+ @Override public boolean transparentReconnectionEnabled() {
+ return BuiltInConnectionProperty.TRANSPARENT_RECONNECTION.wrap(properties)
+ .getBoolean();
+ }
+
/** Converts a {@link Properties} object containing (name, value)
* pairs into a map whose keys are
* {@link org.apache.calcite.avatica.InternalProperty} objects.
diff --git a/server/src/test/java/org/apache/calcite/avatica/remote/ConnectionPropertiesTest.java b/server/src/test/java/org/apache/calcite/avatica/remote/ConnectionPropertiesTest.java
index 9527dc5..515c8b9 100644
--- a/server/src/test/java/org/apache/calcite/avatica/remote/ConnectionPropertiesTest.java
+++ b/server/src/test/java/org/apache/calcite/avatica/remote/ConnectionPropertiesTest.java
@@ -16,7 +16,9 @@
*/
package org.apache.calcite.avatica.remote;
+import org.apache.calcite.avatica.AvaticaClientRuntimeException;
import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.BuiltInConnectionProperty;
import org.apache.calcite.avatica.ConnectionSpec;
import org.apache.calcite.avatica.jdbc.JdbcMeta;
import org.apache.calcite.avatica.server.HttpServer;
@@ -36,6 +38,8 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/** Tests covering {@link ConnectionPropertiesTest}. */
@RunWith(Parameterized.class)
@@ -48,6 +52,7 @@
private final String url;
private final int port;
private final Driver.Serialization serialization;
+ private final Properties clientReconnectProps = new Properties();
@Parameterized.Parameters(name = "{0}")
public static List<Object[]> parameters() throws Exception {
@@ -63,13 +68,16 @@
this.port = this.server.getPort();
this.serialization = serialization;
this.url = SERVERS.getJdbcUrl(port, serialization);
+ this.clientReconnectProps.setProperty(
+ BuiltInConnectionProperty.TRANSPARENT_RECONNECTION.camelName(), "true");
}
@Test
public void testConnectionPropertiesSync() throws Exception {
ConnectionSpec.getDatabaseLock().lock();
try {
- AvaticaConnection conn = (AvaticaConnection) DriverManager.getConnection(url);
+ AvaticaConnection conn =
+ (AvaticaConnection) DriverManager.getConnection(url, clientReconnectProps);
conn.setAutoCommit(false);
conn.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
@@ -97,6 +105,37 @@
}
}
+ @Test
+ public void testConnectionPropertiesSyncNoReconnect() throws Exception {
+ ConnectionSpec.getDatabaseLock().lock();
+ try {
+ AvaticaConnection conn = (AvaticaConnection) DriverManager.getConnection(url);
+ conn.setAutoCommit(false);
+ conn.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
+
+ // sync connection properties
+ conn.createStatement();
+ Connection remoteConn = getConnection(
+ AvaticaServersForTest.PropertyRemoteJdbcMetaFactory.getInstance(PROPERTIES), conn.id);
+
+ assertFalse(remoteConn.getAutoCommit());
+ assertEquals(remoteConn.getTransactionIsolation(),
+ Connection.TRANSACTION_REPEATABLE_READ);
+
+ // after 1s, remote connection expired and reopen
+ Thread.sleep(1000);
+
+ try {
+ conn.createStatement();
+ fail("Should have thrown AvaticaClientRuntimeException");
+ } catch (Exception e) {
+ assertTrue(e instanceof AvaticaClientRuntimeException);
+ }
+ } finally {
+ ConnectionSpec.getDatabaseLock().unlock();
+ }
+ }
+
private static Connection getConnection(JdbcMeta m, String id) throws Exception {
Field f = JdbcMeta.class.getDeclaredField("connectionCache");
f.setAccessible(true);