[CALCITE-5009] Transparent JDBC connection re-creation may lead to data loss

CALCITE-903 has introduced a transaprent reconnection feature, which will open
a new server-side connection in case it is expired from the server side connection
cache.

While this is convinient for most read-only analytical workload, this
can cause a number a problems, including data loss for transactional connections.

This patch disables the transparent reconnect feature by default, and adds the
transparent_reconnection property, which re-enables it when set to true.
diff --git a/core/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java b/core/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
index 6a5830b..f740d85 100644
--- a/core/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
+++ b/core/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
@@ -66,6 +66,7 @@
    * the number of rows modified. */
   public static final String ROWCOUNT_COLUMN_NAME = "ROWCOUNT";
 
+  //TODO shouldn't we move this to BuiltInConnectionProperty ?
   public static final String NUM_EXECUTE_RETRIES_KEY = "avatica.statement.retries";
   public static final String NUM_EXECUTE_RETRIES_DEFAULT = "5";
 
@@ -96,6 +97,7 @@
   public final Map<Integer, AvaticaStatement> statementMap = new ConcurrentHashMap<>();
   final Map<Integer, AtomicBoolean> flagMap = new ConcurrentHashMap<>();
   protected final long maxRetriesPerExecute;
+  protected final boolean transparentReconnectEnabled;
 
   /**
    * Creates an AvaticaConnection.
@@ -127,6 +129,7 @@
       throw new RuntimeException(e);
     }
     this.maxRetriesPerExecute = getNumStatementRetries(info);
+    this.transparentReconnectEnabled = config().transparentReconnectionEnabled();
   }
 
   /** Computes the number of retries
@@ -792,7 +795,8 @@
         return callable.call();
       } catch (AvaticaClientRuntimeException e) {
         lastException = e;
-        if (ErrorResponse.MISSING_CONNECTION_ERROR_CODE == e.getErrorCode()) {
+        if (ErrorResponse.MISSING_CONNECTION_ERROR_CODE == e.getErrorCode()
+                && transparentReconnectEnabled) {
           this.openConnection();
           continue;
         }
diff --git a/core/src/main/java/org/apache/calcite/avatica/BuiltInConnectionProperty.java b/core/src/main/java/org/apache/calcite/avatica/BuiltInConnectionProperty.java
index 16e1061..0313e4c 100644
--- a/core/src/main/java/org/apache/calcite/avatica/BuiltInConnectionProperty.java
+++ b/core/src/main/java/org/apache/calcite/avatica/BuiltInConnectionProperty.java
@@ -86,7 +86,9 @@
   KEY_PASSWORD("key_password", Type.STRING, "", false),
 
   HOSTNAME_VERIFICATION("hostname_verification", Type.ENUM, HostnameVerification.STRICT,
-      HostnameVerification.class, false);
+      HostnameVerification.class, false),
+
+  TRANSPARENT_RECONNECTION("transparent_reconnection", Type.BOOLEAN, Boolean.FALSE, false);
 
   private final String camelName;
   private final Type type;
diff --git a/core/src/main/java/org/apache/calcite/avatica/ConnectionConfig.java b/core/src/main/java/org/apache/calcite/avatica/ConnectionConfig.java
index bbbfa87..30d0c57 100644
--- a/core/src/main/java/org/apache/calcite/avatica/ConnectionConfig.java
+++ b/core/src/main/java/org/apache/calcite/avatica/ConnectionConfig.java
@@ -62,6 +62,8 @@
   String keyPassword();
   /** @see BuiltInConnectionProperty#HOSTNAME_VERIFICATION */
   HostnameVerification hostnameVerification();
+  /** @see BuiltInConnectionProperty#TRANSPARENT_RECONNECTION */
+  boolean transparentReconnectionEnabled();
 }
 
 // End ConnectionConfig.java
diff --git a/core/src/main/java/org/apache/calcite/avatica/ConnectionConfigImpl.java b/core/src/main/java/org/apache/calcite/avatica/ConnectionConfigImpl.java
index 36cdf61..7a2cbe4 100644
--- a/core/src/main/java/org/apache/calcite/avatica/ConnectionConfigImpl.java
+++ b/core/src/main/java/org/apache/calcite/avatica/ConnectionConfigImpl.java
@@ -128,6 +128,11 @@
         .getEnum(HostnameVerification.class);
   }
 
+  @Override public boolean transparentReconnectionEnabled() {
+    return BuiltInConnectionProperty.TRANSPARENT_RECONNECTION.wrap(properties)
+       .getBoolean();
+  }
+
   /** Converts a {@link Properties} object containing (name, value)
    * pairs into a map whose keys are
    * {@link org.apache.calcite.avatica.InternalProperty} objects.
diff --git a/server/src/test/java/org/apache/calcite/avatica/remote/ConnectionPropertiesTest.java b/server/src/test/java/org/apache/calcite/avatica/remote/ConnectionPropertiesTest.java
index 9527dc5..515c8b9 100644
--- a/server/src/test/java/org/apache/calcite/avatica/remote/ConnectionPropertiesTest.java
+++ b/server/src/test/java/org/apache/calcite/avatica/remote/ConnectionPropertiesTest.java
@@ -16,7 +16,9 @@
  */
 package org.apache.calcite.avatica.remote;
 
+import org.apache.calcite.avatica.AvaticaClientRuntimeException;
 import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.BuiltInConnectionProperty;
 import org.apache.calcite.avatica.ConnectionSpec;
 import org.apache.calcite.avatica.jdbc.JdbcMeta;
 import org.apache.calcite.avatica.server.HttpServer;
@@ -36,6 +38,8 @@
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /** Tests covering {@link ConnectionPropertiesTest}. */
 @RunWith(Parameterized.class)
@@ -48,6 +52,7 @@
   private final String url;
   private final int port;
   private final Driver.Serialization serialization;
+  private final Properties clientReconnectProps = new Properties();
 
   @Parameterized.Parameters(name = "{0}")
   public static List<Object[]> parameters() throws Exception {
@@ -63,13 +68,16 @@
     this.port = this.server.getPort();
     this.serialization = serialization;
     this.url = SERVERS.getJdbcUrl(port, serialization);
+    this.clientReconnectProps.setProperty(
+        BuiltInConnectionProperty.TRANSPARENT_RECONNECTION.camelName(), "true");
   }
 
   @Test
   public void testConnectionPropertiesSync() throws Exception {
     ConnectionSpec.getDatabaseLock().lock();
     try {
-      AvaticaConnection conn = (AvaticaConnection) DriverManager.getConnection(url);
+      AvaticaConnection conn =
+              (AvaticaConnection) DriverManager.getConnection(url, clientReconnectProps);
       conn.setAutoCommit(false);
       conn.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
 
@@ -97,6 +105,37 @@
     }
   }
 
+  @Test
+  public void testConnectionPropertiesSyncNoReconnect() throws Exception {
+    ConnectionSpec.getDatabaseLock().lock();
+    try {
+      AvaticaConnection conn = (AvaticaConnection) DriverManager.getConnection(url);
+      conn.setAutoCommit(false);
+      conn.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
+
+      // sync connection properties
+      conn.createStatement();
+      Connection remoteConn = getConnection(
+              AvaticaServersForTest.PropertyRemoteJdbcMetaFactory.getInstance(PROPERTIES), conn.id);
+
+      assertFalse(remoteConn.getAutoCommit());
+      assertEquals(remoteConn.getTransactionIsolation(),
+              Connection.TRANSACTION_REPEATABLE_READ);
+
+      // after 1s, remote connection expired and reopen
+      Thread.sleep(1000);
+
+      try {
+        conn.createStatement();
+        fail("Should have thrown AvaticaClientRuntimeException");
+      } catch (Exception e) {
+        assertTrue(e instanceof AvaticaClientRuntimeException);
+      }
+    } finally {
+      ConnectionSpec.getDatabaseLock().unlock();
+    }
+  }
+
   private static Connection getConnection(JdbcMeta m, String id) throws Exception {
     Field f = JdbcMeta.class.getDeclaredField("connectionCache");
     f.setAccessible(true);