Merge branch '3.0.x' into 3.1.x
diff --git a/changelog/README.md b/changelog/README.md
index 626c379..7fc0026 100644
--- a/changelog/README.md
+++ b/changelog/README.md
@@ -136,6 +136,7 @@
 ### 3.0.8 (in progress)
 
 - [bug] JAVA-1404: Fix min token handling in TokenRange.contains.
+- [bug] JAVA-1429: Prevent heartbeats until connection is fully initialized.
 
 
 ### 3.0.7
diff --git a/driver-core/src/main/java/com/datastax/driver/core/Connection.java b/driver-core/src/main/java/com/datastax/driver/core/Connection.java
index 9edb011..39d15e3 100644
--- a/driver-core/src/main/java/com/datastax/driver/core/Connection.java
+++ b/driver-core/src/main/java/com/datastax/driver/core/Connection.java
@@ -1077,7 +1077,7 @@
 
         @Override
         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
-            if (!isClosed() && evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) {
+            if (isInitialized && !isClosed() && evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) {
                 logger.debug("{} was inactive for {} seconds, sending heartbeat", Connection.this, factory.configuration.getPoolingOptions().getHeartbeatIntervalSeconds());
                 write(HEARTBEAT_CALLBACK);
             }
diff --git a/driver-core/src/test/java/com/datastax/driver/core/AuthenticationTest.java b/driver-core/src/test/java/com/datastax/driver/core/AuthenticationTest.java
index 41a1e4e..c96cc69 100644
--- a/driver-core/src/test/java/com/datastax/driver/core/AuthenticationTest.java
+++ b/driver-core/src/test/java/com/datastax/driver/core/AuthenticationTest.java
@@ -20,6 +20,9 @@
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import java.net.InetSocketAddress;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.TimeUnit;
 
 import static com.datastax.driver.core.CreateCCM.TestMode.PER_METHOD;
@@ -86,4 +89,47 @@
         }
     }
 
+    /**
+     * Ensures that authentication is possible even if the server is busy during
+     * SASL handshake.
+     *
+     * @jira_ticket JAVA-1429
+     */
+    @Test(groups = "short")
+    @CCMConfig(dirtiesContext = true)
+    public void should_connect_with_slow_server() throws InterruptedException {
+        Cluster cluster = Cluster.builder()
+                .addContactPoints(getContactPoints())
+                .withPort(ccm().getBinaryPort())
+                .withAuthProvider(new SlowAuthProvider())
+                .withPoolingOptions(new PoolingOptions()
+                        .setHeartbeatIntervalSeconds(1))
+                .build();
+        cluster.connect();
+    }
+
+    private class SlowAuthProvider extends PlainTextAuthProvider {
+
+        public SlowAuthProvider() {
+            super("cassandra", "cassandra");
+        }
+
+        @Override
+        public Authenticator newAuthenticator(InetSocketAddress host, String authenticator) throws AuthenticationException {
+            simulateBusyServer();
+            return super.newAuthenticator(host, authenticator);
+        }
+
+    }
+
+    private void simulateBusyServer() {
+        ccm().pause(1);
+        new Timer().schedule(new TimerTask() {
+            @Override
+            public void run() {
+                ccm().resume(1);
+            }
+        }, 2000);
+    }
+
 }
diff --git a/driver-core/src/test/java/com/datastax/driver/core/CCMAccess.java b/driver-core/src/test/java/com/datastax/driver/core/CCMAccess.java
index 6aa01f1..4d3b523 100644
--- a/driver-core/src/test/java/com/datastax/driver/core/CCMAccess.java
+++ b/driver-core/src/test/java/com/datastax/driver/core/CCMAccess.java
@@ -169,6 +169,20 @@
     void forceStop(int n);
 
     /**
+     * Pauses the {@code nth} host in the CCM cluster.
+     *
+     * @param n the node number (starting from 1).
+     */
+    void pause(int n);
+
+    /**
+     * Resumes the {@code nth} host in the CCM cluster.
+     *
+     * @param n the node number (starting from 1).
+     */
+    void resume(int n);
+
+    /**
      * Removes the {@code nth} host in the CCM cluster.
      *
      * @param n the node number (starting from 1).
diff --git a/driver-core/src/test/java/com/datastax/driver/core/CCMBridge.java b/driver-core/src/test/java/com/datastax/driver/core/CCMBridge.java
index 8b800ac..343b294 100644
--- a/driver-core/src/test/java/com/datastax/driver/core/CCMBridge.java
+++ b/driver-core/src/test/java/com/datastax/driver/core/CCMBridge.java
@@ -522,6 +522,18 @@
     }
 
     @Override
+    public void pause(int n) {
+        logger.debug(String.format("Pausing: node %s (%s%s:%s) in %s", n, TestUtils.IP_PREFIX, n, binaryPort, this));
+        execute(CCM_COMMAND + " node%d pause", n);
+    }
+
+    @Override
+    public void resume(int n) {
+        logger.debug(String.format("Resuming: node %s (%s%s:%s) in %s", n, TestUtils.IP_PREFIX, n, binaryPort, this));
+        execute(CCM_COMMAND + " node%d resume", n);
+    }
+
+    @Override
     public void remove(int n) {
         logger.debug(String.format("Removing: node %s (%s%s:%s) from %s", n, TestUtils.IP_PREFIX, n, binaryPort, this));
         execute(CCM_COMMAND + " node%d remove", n);
@@ -679,6 +691,7 @@
     /**
      * Waits for a host to be up by pinging the TCP socket directly, without using the Java driver's API.
      */
+    @Override
     public void waitForUp(int node) {
         TestUtils.waitUntilPortIsUp(addressOfNode(node));
     }
@@ -686,6 +699,7 @@
     /**
      * Waits for a host to be down by pinging the TCP socket directly, without using the Java driver's API.
      */
+    @Override
     public void waitForDown(int node) {
         TestUtils.waitUntilPortIsDown(addressOfNode(node));
     }
diff --git a/driver-core/src/test/java/com/datastax/driver/core/CCMCache.java b/driver-core/src/test/java/com/datastax/driver/core/CCMCache.java
index 4a1c631..0431904 100644
--- a/driver-core/src/test/java/com/datastax/driver/core/CCMCache.java
+++ b/driver-core/src/test/java/com/datastax/driver/core/CCMCache.java
@@ -166,6 +166,16 @@
         }
 
         @Override
+        public void pause(int n) {
+            ccm.pause(n);
+        }
+
+        @Override
+        public void resume(int n) {
+            ccm.resume(n);
+        }
+
+        @Override
         public void remove(int n) {
             ccm.remove(n);
         }
diff --git a/driver-core/src/test/java/com/datastax/driver/core/CCMTestsSupport.java b/driver-core/src/test/java/com/datastax/driver/core/CCMTestsSupport.java
index f8d18c0..853d844 100644
--- a/driver-core/src/test/java/com/datastax/driver/core/CCMTestsSupport.java
+++ b/driver-core/src/test/java/com/datastax/driver/core/CCMTestsSupport.java
@@ -178,6 +178,16 @@
         }
 
         @Override
+        public void pause(int n) {
+            throw new UnsupportedOperationException("This CCM cluster is read-only");
+        }
+
+        @Override
+        public void resume(int n) {
+            throw new UnsupportedOperationException("This CCM cluster is read-only");
+        }
+
+        @Override
         public void remove(int n) {
             throw new UnsupportedOperationException("This CCM cluster is read-only");
         }
diff --git a/driver-core/src/test/java/com/datastax/driver/core/HostConnectionPoolTest.java b/driver-core/src/test/java/com/datastax/driver/core/HostConnectionPoolTest.java
index bf8ee33..33d8567 100644
--- a/driver-core/src/test/java/com/datastax/driver/core/HostConnectionPoolTest.java
+++ b/driver-core/src/test/java/com/datastax/driver/core/HostConnectionPoolTest.java
@@ -18,7 +18,7 @@
 import com.codahale.metrics.Gauge;
 import com.datastax.driver.core.exceptions.*;
 import com.datastax.driver.core.policies.ConstantReconnectionPolicy;
-import com.datastax.driver.core.utils.MoreFutures;
+import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.base.Throwables;
 import com.google.common.util.concurrent.*;
@@ -290,7 +290,7 @@
             int count = 0;
             for (MockRequest queuedRequest : queuedRequests) {
                 try {
-                    Uninterruptibles.getUninterruptibly(queuedRequest.connectionFuture, 5, TimeUnit.SECONDS);
+                    Uninterruptibles.getUninterruptibly(queuedRequest.connectionFuture, 10, TimeUnit.SECONDS);
                     count++;
                 } catch (ExecutionException e) {
                     // 128th request should timeout since all in flight requests are used.
@@ -400,7 +400,6 @@
                 assertThat(e.getCause()).isInstanceOf(DriverException.class);
                 assertThat(e.getCause().getMessage()).contains("Aborting attempt to set keyspace to 'newkeyspace' since there is already an in flight attempt to set keyspace to 'slowks'.");
             }
-
         } finally {
             MockRequest.completeAll(requests);
             cluster.close();
@@ -1242,8 +1241,8 @@
 
             // Should create up to core connections.
             verify(factory, timeout(readTimeout).times(1)).open(any(HostConnectionPool.class));
-
             assertPoolSize(pool, 1);
+            Uninterruptibles.getUninterruptibly(request.requestInitialized, 10, TimeUnit.SECONDS);
             request.simulateSuccessResponse();
         } finally {
             cluster.close();
@@ -1348,6 +1347,9 @@
         enum State {START, COMPLETED, FAILED, TIMED_OUT}
 
         final ListenableFuture<Connection> connectionFuture;
+
+        final ListenableFuture<Connection.ResponseHandler> requestInitialized;
+
         private volatile Connection.ResponseHandler responseHandler;
 
         final AtomicReference<State> state = new AtomicReference<State>(State.START);
@@ -1404,12 +1406,13 @@
 
         private MockRequest(HostConnectionPool pool, int timeoutMillis, int maxQueueSize) throws ConnectionException {
             this.connectionFuture = pool.borrowConnection(timeoutMillis, MILLISECONDS, maxQueueSize);
-            Futures.addCallback(this.connectionFuture, new MoreFutures.SuccessCallback<Connection>() {
+            requestInitialized = Futures.transform(this.connectionFuture, new Function<Connection, Connection.ResponseHandler>() {
                 @Override
-                public void onSuccess(Connection connection) {
+                public Connection.ResponseHandler apply(Connection connection) {
                     MockRequest thisRequest = MockRequest.this;
                     thisRequest.responseHandler = new Connection.ResponseHandler(connection, -1, thisRequest);
                     connection.dispatcher.add(thisRequest.responseHandler);
+                    return responseHandler;
                 }
             });
         }
diff --git a/driver-mapping/src/test/java/com/datastax/driver/mapping/MapperSaveNullFieldsTest.java b/driver-mapping/src/test/java/com/datastax/driver/mapping/MapperSaveNullFieldsTest.java
index cc71e48..0479e1c 100644
--- a/driver-mapping/src/test/java/com/datastax/driver/mapping/MapperSaveNullFieldsTest.java
+++ b/driver-mapping/src/test/java/com/datastax/driver/mapping/MapperSaveNullFieldsTest.java
@@ -17,6 +17,7 @@
 
 import com.datastax.driver.core.BoundStatement;
 import com.datastax.driver.core.CCMTestsSupport;
+import com.datastax.driver.core.utils.CassandraVersion;
 import com.datastax.driver.mapping.Mapper.Option;
 import com.datastax.driver.mapping.annotations.PartitionKey;
 import com.datastax.driver.mapping.annotations.Table;
@@ -40,6 +41,7 @@
         mapper = new MappingManager(session()).mapper(User.class);
     }
 
+    @CassandraVersion("2.1.0")
     @Test(groups = "short")
     void should_save_null_fields_if_requested() {
         should_save_null_fields(true, Option.saveNullFields(true));