Merge branch '3.0.x' into 3.1.x
diff --git a/changelog/README.md b/changelog/README.md
index 626c379..7fc0026 100644
--- a/changelog/README.md
+++ b/changelog/README.md
@@ -136,6 +136,7 @@
### 3.0.8 (in progress)
- [bug] JAVA-1404: Fix min token handling in TokenRange.contains.
+- [bug] JAVA-1429: Prevent heartbeats until connection is fully initialized.
### 3.0.7
diff --git a/driver-core/src/main/java/com/datastax/driver/core/Connection.java b/driver-core/src/main/java/com/datastax/driver/core/Connection.java
index 9edb011..39d15e3 100644
--- a/driver-core/src/main/java/com/datastax/driver/core/Connection.java
+++ b/driver-core/src/main/java/com/datastax/driver/core/Connection.java
@@ -1077,7 +1077,7 @@
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- if (!isClosed() && evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) {
+ if (isInitialized && !isClosed() && evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) {
logger.debug("{} was inactive for {} seconds, sending heartbeat", Connection.this, factory.configuration.getPoolingOptions().getHeartbeatIntervalSeconds());
write(HEARTBEAT_CALLBACK);
}
diff --git a/driver-core/src/test/java/com/datastax/driver/core/AuthenticationTest.java b/driver-core/src/test/java/com/datastax/driver/core/AuthenticationTest.java
index 41a1e4e..c96cc69 100644
--- a/driver-core/src/test/java/com/datastax/driver/core/AuthenticationTest.java
+++ b/driver-core/src/test/java/com/datastax/driver/core/AuthenticationTest.java
@@ -20,6 +20,9 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import java.net.InetSocketAddress;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import static com.datastax.driver.core.CreateCCM.TestMode.PER_METHOD;
@@ -86,4 +89,47 @@
}
}
+ /**
+ * Ensures that authentication is possible even if the server is busy during
+ * SASL handshake.
+ *
+ * @jira_ticket JAVA-1429
+ */
+ @Test(groups = "short")
+ @CCMConfig(dirtiesContext = true)
+ public void should_connect_with_slow_server() throws InterruptedException {
+ Cluster cluster = Cluster.builder()
+ .addContactPoints(getContactPoints())
+ .withPort(ccm().getBinaryPort())
+ .withAuthProvider(new SlowAuthProvider())
+ .withPoolingOptions(new PoolingOptions()
+ .setHeartbeatIntervalSeconds(1))
+ .build();
+ cluster.connect();
+ }
+
+ private class SlowAuthProvider extends PlainTextAuthProvider {
+
+ public SlowAuthProvider() {
+ super("cassandra", "cassandra");
+ }
+
+ @Override
+ public Authenticator newAuthenticator(InetSocketAddress host, String authenticator) throws AuthenticationException {
+ simulateBusyServer();
+ return super.newAuthenticator(host, authenticator);
+ }
+
+ }
+
+ private void simulateBusyServer() {
+ ccm().pause(1);
+ new Timer().schedule(new TimerTask() {
+ @Override
+ public void run() {
+ ccm().resume(1);
+ }
+ }, 2000);
+ }
+
}
diff --git a/driver-core/src/test/java/com/datastax/driver/core/CCMAccess.java b/driver-core/src/test/java/com/datastax/driver/core/CCMAccess.java
index 6aa01f1..4d3b523 100644
--- a/driver-core/src/test/java/com/datastax/driver/core/CCMAccess.java
+++ b/driver-core/src/test/java/com/datastax/driver/core/CCMAccess.java
@@ -169,6 +169,20 @@
void forceStop(int n);
/**
+ * Pauses the {@code nth} host in the CCM cluster.
+ *
+ * @param n the node number (starting from 1).
+ */
+ void pause(int n);
+
+ /**
+ * Resumes the {@code nth} host in the CCM cluster.
+ *
+ * @param n the node number (starting from 1).
+ */
+ void resume(int n);
+
+ /**
* Removes the {@code nth} host in the CCM cluster.
*
* @param n the node number (starting from 1).
diff --git a/driver-core/src/test/java/com/datastax/driver/core/CCMBridge.java b/driver-core/src/test/java/com/datastax/driver/core/CCMBridge.java
index 8b800ac..343b294 100644
--- a/driver-core/src/test/java/com/datastax/driver/core/CCMBridge.java
+++ b/driver-core/src/test/java/com/datastax/driver/core/CCMBridge.java
@@ -522,6 +522,18 @@
}
@Override
+ public void pause(int n) {
+ logger.debug(String.format("Pausing: node %s (%s%s:%s) in %s", n, TestUtils.IP_PREFIX, n, binaryPort, this));
+ execute(CCM_COMMAND + " node%d pause", n);
+ }
+
+ @Override
+ public void resume(int n) {
+ logger.debug(String.format("Resuming: node %s (%s%s:%s) in %s", n, TestUtils.IP_PREFIX, n, binaryPort, this));
+ execute(CCM_COMMAND + " node%d resume", n);
+ }
+
+ @Override
public void remove(int n) {
logger.debug(String.format("Removing: node %s (%s%s:%s) from %s", n, TestUtils.IP_PREFIX, n, binaryPort, this));
execute(CCM_COMMAND + " node%d remove", n);
@@ -679,6 +691,7 @@
/**
* Waits for a host to be up by pinging the TCP socket directly, without using the Java driver's API.
*/
+ @Override
public void waitForUp(int node) {
TestUtils.waitUntilPortIsUp(addressOfNode(node));
}
@@ -686,6 +699,7 @@
/**
* Waits for a host to be down by pinging the TCP socket directly, without using the Java driver's API.
*/
+ @Override
public void waitForDown(int node) {
TestUtils.waitUntilPortIsDown(addressOfNode(node));
}
diff --git a/driver-core/src/test/java/com/datastax/driver/core/CCMCache.java b/driver-core/src/test/java/com/datastax/driver/core/CCMCache.java
index 4a1c631..0431904 100644
--- a/driver-core/src/test/java/com/datastax/driver/core/CCMCache.java
+++ b/driver-core/src/test/java/com/datastax/driver/core/CCMCache.java
@@ -166,6 +166,16 @@
}
@Override
+ public void pause(int n) {
+ ccm.pause(n);
+ }
+
+ @Override
+ public void resume(int n) {
+ ccm.resume(n);
+ }
+
+ @Override
public void remove(int n) {
ccm.remove(n);
}
diff --git a/driver-core/src/test/java/com/datastax/driver/core/CCMTestsSupport.java b/driver-core/src/test/java/com/datastax/driver/core/CCMTestsSupport.java
index f8d18c0..853d844 100644
--- a/driver-core/src/test/java/com/datastax/driver/core/CCMTestsSupport.java
+++ b/driver-core/src/test/java/com/datastax/driver/core/CCMTestsSupport.java
@@ -178,6 +178,16 @@
}
@Override
+ public void pause(int n) {
+ throw new UnsupportedOperationException("This CCM cluster is read-only");
+ }
+
+ @Override
+ public void resume(int n) {
+ throw new UnsupportedOperationException("This CCM cluster is read-only");
+ }
+
+ @Override
public void remove(int n) {
throw new UnsupportedOperationException("This CCM cluster is read-only");
}
diff --git a/driver-core/src/test/java/com/datastax/driver/core/HostConnectionPoolTest.java b/driver-core/src/test/java/com/datastax/driver/core/HostConnectionPoolTest.java
index bf8ee33..33d8567 100644
--- a/driver-core/src/test/java/com/datastax/driver/core/HostConnectionPoolTest.java
+++ b/driver-core/src/test/java/com/datastax/driver/core/HostConnectionPoolTest.java
@@ -18,7 +18,7 @@
import com.codahale.metrics.Gauge;
import com.datastax.driver.core.exceptions.*;
import com.datastax.driver.core.policies.ConstantReconnectionPolicy;
-import com.datastax.driver.core.utils.MoreFutures;
+import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.*;
@@ -290,7 +290,7 @@
int count = 0;
for (MockRequest queuedRequest : queuedRequests) {
try {
- Uninterruptibles.getUninterruptibly(queuedRequest.connectionFuture, 5, TimeUnit.SECONDS);
+ Uninterruptibles.getUninterruptibly(queuedRequest.connectionFuture, 10, TimeUnit.SECONDS);
count++;
} catch (ExecutionException e) {
// 128th request should timeout since all in flight requests are used.
@@ -400,7 +400,6 @@
assertThat(e.getCause()).isInstanceOf(DriverException.class);
assertThat(e.getCause().getMessage()).contains("Aborting attempt to set keyspace to 'newkeyspace' since there is already an in flight attempt to set keyspace to 'slowks'.");
}
-
} finally {
MockRequest.completeAll(requests);
cluster.close();
@@ -1242,8 +1241,8 @@
// Should create up to core connections.
verify(factory, timeout(readTimeout).times(1)).open(any(HostConnectionPool.class));
-
assertPoolSize(pool, 1);
+ Uninterruptibles.getUninterruptibly(request.requestInitialized, 10, TimeUnit.SECONDS);
request.simulateSuccessResponse();
} finally {
cluster.close();
@@ -1348,6 +1347,9 @@
enum State {START, COMPLETED, FAILED, TIMED_OUT}
final ListenableFuture<Connection> connectionFuture;
+
+ final ListenableFuture<Connection.ResponseHandler> requestInitialized;
+
private volatile Connection.ResponseHandler responseHandler;
final AtomicReference<State> state = new AtomicReference<State>(State.START);
@@ -1404,12 +1406,13 @@
private MockRequest(HostConnectionPool pool, int timeoutMillis, int maxQueueSize) throws ConnectionException {
this.connectionFuture = pool.borrowConnection(timeoutMillis, MILLISECONDS, maxQueueSize);
- Futures.addCallback(this.connectionFuture, new MoreFutures.SuccessCallback<Connection>() {
+ requestInitialized = Futures.transform(this.connectionFuture, new Function<Connection, Connection.ResponseHandler>() {
@Override
- public void onSuccess(Connection connection) {
+ public Connection.ResponseHandler apply(Connection connection) {
MockRequest thisRequest = MockRequest.this;
thisRequest.responseHandler = new Connection.ResponseHandler(connection, -1, thisRequest);
connection.dispatcher.add(thisRequest.responseHandler);
+ return responseHandler;
}
});
}
diff --git a/driver-mapping/src/test/java/com/datastax/driver/mapping/MapperSaveNullFieldsTest.java b/driver-mapping/src/test/java/com/datastax/driver/mapping/MapperSaveNullFieldsTest.java
index cc71e48..0479e1c 100644
--- a/driver-mapping/src/test/java/com/datastax/driver/mapping/MapperSaveNullFieldsTest.java
+++ b/driver-mapping/src/test/java/com/datastax/driver/mapping/MapperSaveNullFieldsTest.java
@@ -17,6 +17,7 @@
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.CCMTestsSupport;
+import com.datastax.driver.core.utils.CassandraVersion;
import com.datastax.driver.mapping.Mapper.Option;
import com.datastax.driver.mapping.annotations.PartitionKey;
import com.datastax.driver.mapping.annotations.Table;
@@ -40,6 +41,7 @@
mapper = new MappingManager(session()).mapper(User.class);
}
+ @CassandraVersion("2.1.0")
@Test(groups = "short")
void should_save_null_fields_if_requested() {
should_save_null_fields(true, Option.saveNullFields(true));