blob: 3f391c14f56ca3b74b6f02af02f4093318e9c508 [file] [log] [blame] [view]
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
## Load balancing
### Quick overview
Which nodes the driver talks to, and in which order they are tried.
* `basic.load-balancing-policy` in the configuration.
* defaults to `DefaultLoadBalancingPolicy` (opinionated best practices).
* can have per-profile policies.
-----
A Cassandra cluster is typically composed of multiple nodes; the *load balancing policy* (sometimes
abbreviated LBP) is a central component that determines:
* which nodes the driver will communicate with;
* for each new query, which coordinator to pick, and which nodes to use as failover.
It is defined in the [configuration](../configuration/):
```
datastax-java-driver.basic.load-balancing-policy {
class = DefaultLoadBalancingPolicy
}
```
### Concepts
#### Node distance
For each node, the policy computes a *distance* that determines how connections will be established:
* `LOCAL` and `REMOTE` are "active" distances, meaning that the driver will keep open connections to
this node. [Connection pools](../pooling/) can be sized independently for each distance.
* `IGNORED` means that the driver will never attempt to connect.
Typically, the distance will reflect network topology (e.g. local vs. remote datacenter), although
that is entirely up to each policy implementation. It can also change over time.
The driver built-in policies only ever assign the `LOCAL` or `IGNORED` distance, to avoid cross-
datacenter traffic (see below to understand how to change this behavior).
#### Query plan
Each time the driver executes a query, it asks the policy to compute a *query plan*, in other words
a list of nodes. The driver then tries each node in sequence, moving down the plan according to the
[retry policy](../retries/) and [speculative execution policy](../speculative_execution/).
The contents and order of query plans are entirely implementation-specific, but policies typically
return plans that:
* are different for each query, in order to balance the load across the cluster;
* only contain nodes that are known to be able to process queries, i.e. neither ignored nor down;
* favor local nodes over remote ones.
### Built-in policies
In previous versions, the driver provided a wide variety of built-in load balancing policies; in
addition, they could be nested into each other, yielding an even higher number of choices. In our
experience, this has proven to be too complicated: it's not obvious which policy(ies) to choose for
a given use case, and nested policies can sometimes affect each other's effects in subtle and hard-
to-predict ways.
In driver 4+, we are taking a different approach: we provide only a handful of load balancing
policies, that we consider the best choices for most cases:
- `DefaultLoadBalancingPolicy` should almost always be used; it requires a local datacenter to be
specified either programmatically when creating the session, or via the configuration (see below).
It can also use a highly efficient slow replica avoidance mechanism, which is by default enabled.
- `DcInferringLoadBalancingPolicy` is similar to `DefaultLoadBalancingPolicy`, but does not require
a local datacenter to be defined, in which case it will attempt to infer the local datacenter from
the provided contact points. If that's not possible, it will throw an error during session
initialization. This policy is intended mostly for ETL tools and is not recommended for normal
applications.
- `BasicLoadBalancingPolicy` is similar to `DefaultLoadBalancingPolicy`, but does not have the slow
replica avoidance mechanism. More importantly, it is the only policy capable of operating without
local datacenter defined, in which case it will consider nodes in the cluster in a datacenter-
agnostic way. Beware that this could cause spikes in cross-datacenter traffic! This policy is
provided mostly as a starting point for users wishing to implement their own load balancing
policy; it should not be used as is in normal applications.
You can still write a [custom implementation](#custom-implementation) if you have special
requirements.
#### Datacenter locality
By default, both `DefaultLoadBalancingPolicy` and `DcInferringLoadBalancingPolicy` **only connect to
a single datacenter**. The rationale is that a typical multi-region deployment will collocate one or
more application instances with each Cassandra datacenter:
```ditaa
/----+----\
| client |
\----+----/
|
v
/---------------\
| load balancer |
\-------+-------/
|
+------------+------------+
| |
+---------|---------+ +---------|---------+
| Region1 v | | Region2 v |
| /---------\ | | /---------\ |
| | app1 | | | | app2 | |
| \----+----/ | | \----+----/ |
| | | | | |
| v | | v |
| +-----------+ | | +-----------+ |
| | {s} | | | | {s} | |
| | Cassandra +------=------+ Cassandra | |
| | DC1 | | | | DC2 | |
| +-----------+ | | +-----------+ |
| | | |
+-------------------+ +-------------------+
```
When using these policies you **must** provide a local datacenter name, either in the configuration:
```
datastax-java-driver.basic.load-balancing-policy {
local-datacenter = datacenter1
}
```
Or programmatically when building the session:
```java
CqlSession session = CqlSession.builder()
.withLocalDatacenter("datacenter1")
.build();
```
If both are provided, the programmatic value takes precedence.
For convenience, the local datacenter name may be omitted if no contact points were provided: in
that case, the driver will connect to 127.0.0.1:9042, and use that node's datacenter. This is just
for a better out-of-the-box experience for users who have just downloaded the driver; beyond that
initial development phase, you should provide explicit contact points and a local datacenter.
##### Finding the local datacenter
To check which datacenters are defined in a given cluster, you can run [`nodetool status`]. It will
print information about each node in the cluster, grouped by datacenters. Here is an example:
```
$ nodetool status
Datacenter: DC1
===============
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
-- Address Load Tokens Owns Host ID Rack
UN <IP1> 1.5 TB 256 ? <ID1> rack1
UN <IP2> 1.5 TB 256 ? <ID2> rack2
UN <IP3> 1.5 TB 256 ? <ID3> rack3
Datacenter: DC2
===============
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
-- Address Load Tokens Owns Host ID Rack
UN <IP4> 1.5 TB 256 ? <ID4> rack1
UN <IP5> 1.5 TB 256 ? <ID5> rack2
UN <IP6> 1.5 TB 256 ? <ID6> rack3
```
To find out which datacenter should be considered local, you need to first determine which nodes the
driver is going to be co-located with, then choose their datacenter as local. In case of doubt, you
can also use [cqlsh]; if cqlsh is co-located too in the same datacenter, simply run the command
below:
```
cqlsh> select data_center from system.local;
data_center
-------------
DC1
```
#### Cross-datacenter failover
Since the driver by default only contacts nodes in the local datacenter, what happens if the whole
datacenter is down? Resuming the example shown in the diagram above, shouldn't the driver
temporarily allow app1 to connect to the nodes in DC2?
We believe that, while appealing by its simplicity, such ability is not the right way to handle a
datacenter failure: resuming our example above, if the whole DC1 datacenter went down at once, it
probably means a catastrophic failure happened in Region1, and the application node is down as well.
Failover should be cross-region instead (handled by the load balancer in the above example).
However, due to popular demand, starting with driver 4.10, we re-introduced cross-datacenter
failover in the driver built-in load balancing policies.
Cross-datacenter failover is enabled with the following configuration option:
```
datastax-java-driver.advanced.load-balancing-policy.dc-failover {
max-nodes-per-remote-dc = 2
}
```
The default for `max-nodes-per-remote-dc` is zero, which means that failover is disabled. Setting
this option to any value greater than zero will have the following effects:
- The load balancing policies will assign the `REMOTE` distance to that many nodes *in each remote
datacenter*.
- The driver will then attempt to open connections to those nodes. The actual number of connections
to open to each one of those nodes is configurable, see [Connection pools](../pooling/) for
more details. By default, the driver opens only one connection to each node.
- Those remote nodes (and only those) will then become eligible for inclusion in query plans,
effectively enabling cross-datacenter failover.
Beware that enabling such failover can result in cross-datacenter network traffic spikes, if the
local datacenter is down or experiencing high latencies!
Cross-datacenter failover can also have unexpected consequences when using local consistency levels
(LOCAL_ONE, LOCAL_QUORUM and LOCAL_SERIAL). Indeed, a local consistency level may have different
semantics depending on the replication factor (RF) in use in each datacenter: if the local DC has
RF=3 for a given keyspace, but the remote DC has RF=1 for it, achieving LOCAL_QUORUM in the local DC
means 2 replicas required, but in the remote DC, only one will be required.
For this reason, cross-datacenter failover for local consistency levels is disabled by default. If
you want to enable this and understand the consequences, then set the following option to true:
```
datastax-java-driver.advanced.load-balancing-policy.dc-failover {
allow-for-local-consistency-levels = true
}
```
##### Alternatives to driver-level cross-datacenter failover
Before you jump into the failover technique explained above, please also consider the following
alternatives:
1. **Application-level failover**: instead of letting the driver do the failover, implement the
failover logic in your application. Granted, this solution wouldn't be much better if the
application servers are co-located with the Cassandra datacenter itself. It's also a bit more work,
but at least, you would have full control over the failover procedure: you could for example decide,
based on the exact error that prevented the local datacenter from fulfilling a given request,
whether a failover would make sense, and which remote datacenter to use for that specific request.
Such a fine-grained logic is not possible with a driver-level failover. Besides, if you opt for this
approach, execution profiles can come in handy. See "Using multiple policies" below and also check
our [application-level failover example] for a good starting point.
2. **Infrastructure-level failover**: in this scenario, the failover is handled by the
infrastructure. To resume our example above, if Region1 goes down, the load balancers in your
infrastructure would transparently switch all the traffic intended for that region to Region2,
possibly scaling up its bandwidth to cope with the network traffic spike. This is by far the best
solution for the cross-datacenter failover issue in general, but we acknowledge that it also
requires a purpose-built infrastructure. To help you explore this option, read our [white paper].
[application-level failover example]: https://github.com/datastax/java-driver/blob/4.x/examples/src/main/java/com/datastax/oss/driver/examples/failover/CrossDatacenterFailover.java
[white paper]: https://www.datastax.com/sites/default/files/content/whitepaper/files/2019-09/Designing-Fault-Tolerant-Applications-DataStax.pdf
#### Token-aware
The default policy is **token-aware** by default: requests will be routed in priority to the
replicas that own the data being queried.
##### Providing routing information
First make sure that [token metadata](../metadata/token/#configuration) is enabled.
Then your statements need to provide:
* a keyspace: if you use a [per-query keyspace](../statements/per_query_keyspace/), then it will be
used for routing as well. Otherwise, the driver relies on [getRoutingKeyspace()];
* a routing key: it can be provided either by [getRoutingKey()] \(raw binary data) or
[getRoutingToken()] \(already hashed as a token).
Depending on the type of statement, some of this information may be computed automatically,
otherwise you have to set it manually. The examples below assume the following CQL schema:
```
CREATE TABLE testKs.sensor_data(id int, year int, ts timestamp, data double,
PRIMARY KEY ((id, year), ts));
```
For [simple statements](../statements/simple/), routing information is never computed
automatically:
```java
SimpleStatement statement =
SimpleStatement.newInstance(
"SELECT * FROM testKs.sensor_data WHERE id = 1 and year = 2016");
// No routing info available:
assert statement.getRoutingKeyspace() == null;
assert statement.getRoutingKey() == null;
// Set the keyspace manually (skip this if using a per-query keyspace):
statement = statement.setRoutingKeyspace("testKs");
// Set the routing key manually: serialize each partition key component to its target CQL type
statement = statement.setRoutingKey(
TypeCodecs.INT.encodePrimitive(1, session.getContext().getProtocolVersion()),
TypeCodecs.INT.encodePrimitive(2016, session.getContext().getProtocolVersion()));
session.execute(statement);
```
For [bound statements](../statements/prepared/), the keyspace is always available; the routing key
is only available if all components of the partition key are bound as variables:
```java
// All components bound: all info available
PreparedStatement pst1 =
session.prepare("SELECT * FROM testKs.sensor_data WHERE id = :id and year = :year");
BoundStatement statement1 = pst1.bind(1, 2016);
assert statement1.getRoutingKeyspace() != null;
assert statement1.getRoutingKey() != null;
// 'id' hard-coded, only 'year' is bound: only keyspace available
PreparedStatement pst2 =
session.prepare("SELECT * FROM testKs.sensor_data WHERE id = 1 and year = :year");
BoundStatement statement2 = pst2.bind(2016);
assert statement2.getRoutingKeyspace() != null;
assert statement2.getRoutingKey() == null;
```
For [batch statements](../statements/batch/), the routing information of each child statement is
inspected; the first non-null keyspace is used as the keyspace of the batch, and the first non-null
routing key as its routing key (the idea is that all children should have the same routing
information, since batches are supposed to operate on a single partition). If no child has any
routing information, you need to provide it manually.
##### Policy behavior
When the policy computes a query plan, it first inspects the statement's routing information. If
there isn't any, the query plan is a simple round-robin shuffle of all connected nodes that are
located in the local datacenter.
If the statement has routing information, the policy uses it to determine the *local* replicas that
hold the corresponding data. Then it returns a query plan containing these replicas shuffled in
random order, followed by a round-robin shuffle of the rest of the nodes.
If cross-datacenter failover has been activated as explained above, some remote nodes may appear in
query plans as well. With the driver built-in policies, remote nodes always come after local nodes
in query plans: this way, if the local datacenter is up, local nodes will be tried first, and remote
nodes are unlikely to ever be queried. If the local datacenter goes down however, all the local
nodes in query plans will likely fail, causing the query plans to eventually try remote nodes
instead. If the local datacenter unavailability persists, local nodes will be eventually marked down
and will be removed from query plans completely from query plans, until they are back up again.
#### Customizing node distance assignment
Finally, all the driver the built-in policies accept an optional node distance evaluator that gets
invoked each time a node is added to the cluster or comes back up. If the evaluator returns a
non-null distance for the node, that distance will be used, otherwise the driver will use its
built-in logic to assign a default distance to it. This is a good way to exclude nodes or to adjust
their distance according to custom, dynamic criteria.
You can pass the node distance evaluator through the configuration:
```
datastax-java-driver.basic.load-balancing-policy {
class = DefaultLoadBalancingPolicy
local-datacenter = datacenter1
evaluator.class = com.acme.MyNodeDistanceEvaluator
}
```
The node distance evaluator class must implement [NodeDistanceEvaluator], and have a public
constructor that takes a [DriverContext] argument: `public MyNodeDistanceEvaluator(DriverContext
context)`.
Sometimes it's more convenient to pass the evaluator programmatically; you can do that with
`SessionBuilder.withNodeDistanceEvaluator`:
```java
Map<Node, NodeDistance> distances = ...
CqlSession session = CqlSession.builder()
.withNodeDistanceEvaluator((node, dc) -> distances.get(node))
.build();
```
If a programmatic node distance evaluator evaluator is provided, the configuration option is
ignored.
### Custom implementation
You can use your own implementation by specifying its fully-qualified name in the configuration.
Study the [LoadBalancingPolicy] interface and the built-in [BasicLoadingBalancingPolicy] for the
low-level details. Feel free to extend `BasicLoadingBalancingPolicy` and override only the methods
that you wish to modify – but keep in mind that it may be simpler to just start from scratch.
### Using multiple policies
The load balancing policy can be overridden in [execution profiles](../configuration/#profiles):
```
datastax-java-driver {
basic.load-balancing-policy {
class = DefaultLoadBalancingPolicy
}
profiles {
custom-lbp {
basic.load-balancing-policy {
class = CustomLoadBalancingPolicy
}
}
slow {
request.timeout = 30 seconds
}
}
}
```
The `custom-lbp` profile uses a dedicated policy. The `slow` profile inherits the default profile's.
Note that this goes beyond configuration inheritance: the driver only creates a single
`DefaultLoadBalancingPolicy` instance and reuses it (this also occurs if two sibling profiles have
the same configuration).
For query plans, each request uses its declared profile's policy. If it doesn't declare any profile,
or if the profile doesn't have a dedicated policy, then the default profile's policy is used.
For node distances, the driver remembers the last distance suggested by each policy for each node.
Then it uses the "closest" distance for any given node. For example:
* for node1, policy1 suggests distance LOCAL and policy2 suggests REMOTE. node1 is set to LOCAL;
* policy1 changes its suggestion to IGNORED. node1 is set to REMOTE;
* policy1 changes its suggestion to REMOTE. node1 stays at REMOTE.
[DriverContext]: https://docs.datastax.com/en/drivers/java/4.17/com/datastax/oss/driver/api/core/context/DriverContext.html
[LoadBalancingPolicy]: https://docs.datastax.com/en/drivers/java/4.17/com/datastax/oss/driver/api/core/loadbalancing/LoadBalancingPolicy.html
[BasicLoadBalancingPolicy]: https://github.com/datastax/java-driver/blob/4.x/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java
[getRoutingKeyspace()]: https://docs.datastax.com/en/drivers/java/4.17/com/datastax/oss/driver/api/core/session/Request.html#getRoutingKeyspace--
[getRoutingToken()]: https://docs.datastax.com/en/drivers/java/4.17/com/datastax/oss/driver/api/core/session/Request.html#getRoutingToken--
[getRoutingKey()]: https://docs.datastax.com/en/drivers/java/4.17/com/datastax/oss/driver/api/core/session/Request.html#getRoutingKey--
[NodeDistanceEvaluator]: https://docs.datastax.com/en/drivers/java/4.17/com/datastax/oss/driver/api/core/loadbalancing/NodeDistanceEvaluator.html
[`nodetool status`]: https://docs.datastax.com/en/dse/6.7/dse-dev/datastax_enterprise/tools/nodetool/toolsStatus.html
[cqlsh]: https://docs.datastax.com/en/dse/6.7/cql/cql/cql_using/startCqlshStandalone.html