The driver communicates with Cassandra over TCP, using the Cassandra binary protocol. This protocol is asynchronous, which allows each TCP connection to handle multiple simultaneous requests:
ResultSetFuture
will get completed).You don't need to manage connections yourself. You simply interact with a Session
object, which takes care of it.
For each Session
, there is one connection pool per connected host (a host is connected when it is up and not ignored by the load balancing policy).
The number of connections per pool is configurable (this will be described in the next section). The number of stream ids depends on the native protocol version:
+-------+1 n+-------+1 n+----+1 n+----------+1 128/32K+-------+ |Cluster+-----+Session+-----+Pool+-----+Connection+-----------+Request+ +-------+ +-------+ +----+ +----------+ +-------+
Connections pools are configured with a PoolingOptions object, which is global to a Cluster
instance. You can pass that object when building the cluster:
PoolingOptions poolingOptions = new PoolingOptions(); // customize options... Cluster cluster = Cluster.builder() .withContactPoints("127.0.0.1") .withPoolingOptions(poolingOptions) .build();
Most options can also be changed at runtime. If you don‘t have a reference to the PoolingOptions
instance, here’s how you can get it:
PoolingOptions poolingOptions = cluster.getConfiguration().getPoolingOptions(); // customize options...
Connection pools have a variable size, which gets adjusted automatically depending on the current load. There will always be at least a core number of connections, and at most a max number. These values can be configured independently by host distance (the distance is determined by your LoadBalancingPolicy, and will generally indicate whether a host is in the same datacenter or not).
poolingOptions .setCoreConnectionsPerHost(HostDistance.LOCAL, 4) .setMaxConnectionsPerHost( HostDistance.LOCAL, 10) .setCoreConnectionsPerHost(HostDistance.REMOTE, 2) .setMaxConnectionsPerHost( HostDistance.REMOTE, 4);
For convenience, core and max can be set simultaneously:
poolingOptions .setConnectionsPerHost(HostDistance.LOCAL, 4, 10) .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
The default settings are:
LOCAL
hosts: core = 2, max = 8REMOTE
hosts: core = 1, max = 2LOCAL
hosts: core = max = 1REMOTE
hosts: core = max = 1PoolingOptions.setNewConnectionThreshold determines the threshold that triggers the creation of a new connection when the pool is not at its maximum capacity. In general, you shouldn't need to change its default value.
If core != max, the pool will resize automatically to adjust to the current activity on the host.
When activity goes up and there are n connections with n < max, the driver will add a connection when the number of concurrent requests is more than (n - 1) * 128 + PoolingOptions.setNewConnectionThreshold (in layman's terms, when all but the last connection are full and the last connection is above the threshold).
When activity goes down, the driver will “trash” connections if the maximum number of requests in a 10 second time period can be satisfied by less than the number of connections opened. Trashed connections are kept open but do not accept new requests. After a given timeout (defined by PoolingOptions.setIdleTimeoutSeconds), trashed connections are closed and removed. If during that idle period activity increases again, those connections will be resurrected back into the active pool and reused. The main intent of that is to not constantly recreate connections if activity changes quickly over an interval.
PoolingOptions.setMaxRequestsPerConnection allows you to throttle the number of concurrent requests per connection.
With protocol v2, there is no reason to throttle. It is set to 128 (the max) and you should not change it.
With protocol v3, it is set to 1024 for LOCAL
hosts, and 256 for REMOTE
hosts. These low defaults were chosen so that the default configuration for protocol v2 and v3 allow the same total number of simultaneous requests (to avoid bad surprises when clients migrate from v2 to v3). You can raise this threshold, or even set it to the max:
poolingOptions .setMaxRequestsPerConnection(HostDistance.LOCAL, 32768) .setMaxRequestsPerConnection(HostDistance.REMOTE, 2000);
Just keep in mind that high values will give clients more bandwidth and therefore put more pressure on your cluster. This might require some tuning, especially if you have many clients.
If connections stay idle for too long, they might be dropped by intermediate network devices (routers, firewalls...). Normally, TCP keepalive should take care of this; but tweaking low-level keepalive settings might be impractical in some environments.
The driver provides application-side keepalive in the form of a connection heartbeat: when a connection has been idle for a given amount of time, the driver will simulate activity by writing a dummy request to it.
This feature is enabled by default. The default heartbeat interval is 30 seconds, it can be customized with the following method:
poolingOptions.setHeartbeatIntervalSeconds(60);
If it gets changed at runtime, only connections created after that will use the new interval. Most users will want to do this at startup.
The heartbeat interval should be set higher than SocketOptions.readTimeoutMillis: the read timeout is the maximum time that the driver waits for a regular query to complete, therefore the connection should not be considered idle before it has elapsed.
To disable heartbeat, set the interval to 0.
Implementation note: the dummy request sent by heartbeat is an OPTIONS message.
When the driver tries to send a request to a host, it will first try to acquire a connection from this host's pool. If the pool is busy (i.e. all connections are already handling their maximum number of in flight requests), the acquisition attempt gets enqueued until a connection becomes available again.
Two options control that queue: a maximum size (PoolingOptions.setMaxQueueSize) and a timeout (PoolingOptions.setPoolTimeoutMillis).
maxQueueSize
requests are already waiting for a connection, the attempt is also rejected;poolTimeoutMillis
has elapsed, then the attempt succeeds, otherwise it is rejected.If the attempt is rejected, the driver will move to the next host in the query plan, and try to acquire a connection again.
If all hosts are busy with a full queue, the request will fail with a NoHostAvailableException. If you inspect the map returns by this exception's getErrors method, you will see a BusyPoolException for each host.
The easiest way to monitor pool usage is with Session.getState. Here's a simple example that will print the number of open connections, active requests, and maximum capacity for each host, every 5 seconds:
final LoadBalancingPolicy loadBalancingPolicy = cluster.getConfiguration().getPolicies().getLoadBalancingPolicy(); final PoolingOptions poolingOptions = cluster.getConfiguration().getPoolingOptions(); ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1); scheduled.scheduleAtFixedRate(new Runnable() { @Override public void run() { Session.State state = session.getState(); for (Host host : state.getConnectedHosts()) { HostDistance distance = loadBalancingPolicy.distance(host); int connections = state.getOpenConnections(host); int inFlightQueries = state.getInFlightQueries(host); System.out.printf("%s connections=%d, current load=%d, max load=%d%n", host, connections, inFlightQueries, connections * poolingOptions.getMaxRequestsPerConnection(distance)); } } }, 5, 5, TimeUnit.SECONDS);
In real life, you'll probably want something more sophisticated, like exposing a JMX MBean or sending the data to your favorite monitoring tool.
If you find that the current load stays close or equal to the maximum load at all time, it's a sign that your connection pools are saturated and you should raise the max connections per host, or max requests per connection (protocol v3).
If you're using protocol v2 and the load is often less than core * 128, your pools are underused and you could get away with less core connections.
As mentioned above, the default pool size for protocol v3 is core = max = 1. This means all requests to a given node will share a single connection, and therefore a single Netty I/O thread.
There is a corner case where this I/O thread can max out its CPU core and become a bottleneck in the driver; in our benchmarks, this happened with a single-node cluster and a high throughput (approximately 80K requests / second).
It‘s unlikely that you’ll run into this issue: in most real-world deployments, the driver connects to more than one node, so the load will spread across more I/O threads. However if you suspect that you experience the issue, here's what to look out for:
pidstat -tu
on Linux. I/O threads are called <cluster_name>-nio-worker-<n>
, unless you’re injecting your own EventLoopGroup
with NettyOptions
.The solution is to add more connections per node. To ensure that additional connections get created before you run into the bottleneck, either: