[CALCITE-2882] Connection properties are lost after timeout (bake)

1. Set dirty to true when opening connection.
2. Add test reproducing the problem.
3. Refactor AvaticaServersForTest to be able to pass properties
(Stamatis Zampetakis).

Closes #86
diff --git a/core/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java b/core/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
index c0c8c22..ddb6d10 100644
--- a/core/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
+++ b/core/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
@@ -109,6 +109,7 @@
     connection.invokeWithRetries(
         new CallableWithoutException<Void>() {
           public Void call() {
+            propsMap.get(ch.id).setDirty(true);
             final Service.OpenConnectionResponse response =
                 service.apply(new Service.OpenConnectionRequest(ch.id, info));
             return null;
diff --git a/server/src/test/java/org/apache/calcite/avatica/remote/AvaticaServersForTest.java b/server/src/test/java/org/apache/calcite/avatica/remote/AvaticaServersForTest.java
index 2843a24..1dfd535 100644
--- a/server/src/test/java/org/apache/calcite/avatica/remote/AvaticaServersForTest.java
+++ b/server/src/test/java/org/apache/calcite/avatica/remote/AvaticaServersForTest.java
@@ -19,6 +19,7 @@
 import org.apache.calcite.avatica.ConnectionSpec;
 import org.apache.calcite.avatica.Meta;
 import org.apache.calcite.avatica.jdbc.JdbcMeta;
+import org.apache.calcite.avatica.metrics.MetricsSystemConfiguration;
 import org.apache.calcite.avatica.remote.Driver.Serialization;
 import org.apache.calcite.avatica.server.AvaticaHandler;
 import org.apache.calcite.avatica.server.AvaticaJsonHandler;
@@ -35,6 +36,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Properties;
 
 /**
  * Utility class which encapsulates the setup required to write Avatica tests that run against
@@ -71,6 +73,23 @@
   }
 
   /**
+   * Starts Avatica servers for each serialization type with the properties.
+   */
+  public void startServers(Properties properties) {
+    final HandlerFactory factory = new HandlerFactory();
+
+    // Construct the JSON server
+    Service jsonService =
+            new LocalService(PropertyRemoteJdbcMetaFactory.getInstance(properties));
+    startServer(factory, jsonService, Serialization.JSON, null, null);
+
+    // Construct the Protobuf server
+    Service protobufService =
+            new LocalService(PropertyRemoteJdbcMetaFactory.getInstance(properties));
+    startServer(factory, protobufService, Serialization.PROTOBUF, null, null);
+  }
+
+  /**
    * Starts Avatica servers for each serialization type with the provided {@code serverConfig}.
    */
   public void startServers(AvaticaServerConfiguration serverConfig) {
@@ -78,21 +97,25 @@
 
     // Construct the JSON server
     Service jsonService = new LocalService(FullyRemoteJdbcMetaFactory.getInstance());
-    AvaticaHandler jsonHandler = factory.getHandler(jsonService, Serialization.JSON, null,
-        serverConfig);
-    final HttpServer jsonServer = new HttpServer.Builder().withHandler(jsonHandler)
-        .withPort(0).build();
-    jsonServer.start();
-    serversBySerialization.put(Serialization.JSON, jsonServer);
+    startServer(factory, jsonService, Serialization.JSON, null, serverConfig);
 
     // Construct the Protobuf server
     Service protobufService = new LocalService(FullyRemoteJdbcMetaFactory.getInstance());
-    AvaticaHandler protobufHandler = factory.getHandler(protobufService, Serialization.PROTOBUF,
-        null, serverConfig);
-    final HttpServer protobufServer = new HttpServer.Builder().withHandler(protobufHandler)
-        .withPort(0).build();
-    protobufServer.start();
-    serversBySerialization.put(Serialization.PROTOBUF, protobufServer);
+    startServer(factory, protobufService, Serialization.PROTOBUF, null, serverConfig);
+  }
+
+  /**
+   * Starts Avatica server and cache.
+   */
+  public void startServer(HandlerFactory factory, Service service, Serialization serialization,
+                          MetricsSystemConfiguration metricsConfig,
+                          AvaticaServerConfiguration serverConfig) {
+    AvaticaHandler handler = factory.getHandler(service, serialization,
+            metricsConfig, serverConfig);
+    final HttpServer server = new HttpServer.Builder().withHandler(handler)
+            .withPort(0).build();
+    server.start();
+    serversBySerialization.put(serialization, server);
   }
 
   /**
@@ -164,7 +187,7 @@
       if (instance == null) {
         try {
           instance = new JdbcMeta(CONNECTION_SPEC.url, CONNECTION_SPEC.username,
-              CONNECTION_SPEC.password);
+                  CONNECTION_SPEC.password);
         } catch (SQLException e) {
           throw new RuntimeException(e);
         }
@@ -176,6 +199,34 @@
       return getInstance();
     }
   }
+
+  /** Factory that provides a {@link JdbcMeta} with properties. */
+  public static class PropertyRemoteJdbcMetaFactory implements Meta.Factory {
+
+    private static Map<Properties, JdbcMeta> instances = new HashMap<>();
+
+    static JdbcMeta getInstance(Properties properties) {
+      try {
+        if (properties == null) {
+          return new JdbcMeta(CONNECTION_SPEC.url, CONNECTION_SPEC.username,
+                    CONNECTION_SPEC.password);
+        }
+        if (instances.get(properties) == null) {
+          properties.put("user", CONNECTION_SPEC.username);
+          properties.put("password", CONNECTION_SPEC.password);
+          JdbcMeta instance = new JdbcMeta(CONNECTION_SPEC.url, properties);
+          instances.put(properties, instance);
+        }
+      } catch (SQLException e) {
+        throw new RuntimeException(e);
+      }
+      return instances.get(properties);
+    }
+
+    @Override public Meta create(List<String> args) {
+      return getInstance(new Properties());
+    }
+  }
 }
 
 // End AvaticaServersForTest.java
diff --git a/server/src/test/java/org/apache/calcite/avatica/remote/ConnectionPropertiesTest.java b/server/src/test/java/org/apache/calcite/avatica/remote/ConnectionPropertiesTest.java
new file mode 100644
index 0000000..9527dc5
--- /dev/null
+++ b/server/src/test/java/org/apache/calcite/avatica/remote/ConnectionPropertiesTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.ConnectionSpec;
+import org.apache.calcite.avatica.jdbc.JdbcMeta;
+import org.apache.calcite.avatica.server.HttpServer;
+
+import com.google.common.cache.Cache;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.lang.reflect.Field;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/** Tests covering {@link ConnectionPropertiesTest}. */
+@RunWith(Parameterized.class)
+public class ConnectionPropertiesTest {
+  private static final AvaticaServersForTest SERVERS = new AvaticaServersForTest();
+  private static final Properties PROPERTIES = new Properties();
+
+
+  private final HttpServer server;
+  private final String url;
+  private final int port;
+  private final Driver.Serialization serialization;
+
+  @Parameterized.Parameters(name = "{0}")
+  public static List<Object[]> parameters() throws Exception {
+    PROPERTIES.put(JdbcMeta.ConnectionCacheSettings.EXPIRY_DURATION.key(), "1");
+    PROPERTIES.put(JdbcMeta.ConnectionCacheSettings.EXPIRY_UNIT.key(), TimeUnit.SECONDS.name());
+    SERVERS.startServers(PROPERTIES);
+    return SERVERS.getJUnitParameters();
+  }
+
+  public ConnectionPropertiesTest(Driver.Serialization serialization,
+                                    HttpServer server) {
+    this.server = server;
+    this.port = this.server.getPort();
+    this.serialization = serialization;
+    this.url = SERVERS.getJdbcUrl(port, serialization);
+  }
+
+  @Test
+  public void testConnectionPropertiesSync() throws Exception {
+    ConnectionSpec.getDatabaseLock().lock();
+    try {
+      AvaticaConnection conn = (AvaticaConnection) DriverManager.getConnection(url);
+      conn.setAutoCommit(false);
+      conn.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
+
+      // sync connection properties
+      conn.createStatement();
+      Connection remoteConn = getConnection(
+              AvaticaServersForTest.PropertyRemoteJdbcMetaFactory.getInstance(PROPERTIES), conn.id);
+
+      assertFalse(remoteConn.getAutoCommit());
+      assertEquals(remoteConn.getTransactionIsolation(),
+              Connection.TRANSACTION_REPEATABLE_READ);
+
+      // after 1s, remote connection expired and reopen
+      Thread.sleep(1000);
+
+      conn.createStatement();
+      Connection remoteConn1 = getConnection(
+              AvaticaServersForTest.PropertyRemoteJdbcMetaFactory.getInstance(PROPERTIES), conn.id);
+
+      assertFalse(remoteConn1.getAutoCommit());
+      assertEquals(remoteConn1.getTransactionIsolation(),
+              Connection.TRANSACTION_REPEATABLE_READ);
+    } finally {
+      ConnectionSpec.getDatabaseLock().unlock();
+    }
+  }
+
+  private static Connection getConnection(JdbcMeta m, String id) throws Exception {
+    Field f = JdbcMeta.class.getDeclaredField("connectionCache");
+    f.setAccessible(true);
+    //noinspection unchecked
+    Cache<String, Connection> connectionCache = (Cache<String, Connection>) f.get(m);
+    return connectionCache.getIfPresent(id);
+  }
+}
+
+// End ConnectionPropertiesTest.java
diff --git a/server/src/test/java/org/apache/calcite/avatica/test/AvaticaSuite.java b/server/src/test/java/org/apache/calcite/avatica/test/AvaticaSuite.java
index 53b6c81..661b0d4 100644
--- a/server/src/test/java/org/apache/calcite/avatica/test/AvaticaSuite.java
+++ b/server/src/test/java/org/apache/calcite/avatica/test/AvaticaSuite.java
@@ -18,8 +18,9 @@
 
 import org.apache.calcite.avatica.RemoteDriverTest;
 
-import org.junit.runner.RunWith;
+import org.apache.calcite.avatica.remote.ConnectionPropertiesTest;
 
+import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 
 /**
@@ -29,7 +30,8 @@
 @Suite.SuiteClasses({
     AvaticaUtilsTest.class,
     ConnectStringParserTest.class,
-    RemoteDriverTest.class
+    RemoteDriverTest.class,
+    ConnectionPropertiesTest.class
     })
 public class AvaticaSuite {
 }