| ## Load balancing |
| |
| A Cassandra cluster is typically composed of multiple hosts; the [LoadBalancingPolicy] \(sometimes abbreviated LBP) is a |
| central component that determines: |
| |
| * which hosts the driver will communicate with; |
| * for each new query, which coordinator to pick, and which hosts to use as failover. |
| |
| The policy is configured when initializing the cluster: |
| |
| ```java |
| Cluster cluster = Cluster.builder() |
| .addContactPoint("127.0.0.1") |
| .withLoadBalancingPolicy(new RoundRobinPolicy()) |
| .build(); |
| ``` |
| |
| Once the cluster has been built, you can't change the policy, but you may inspect it at runtime: |
| |
| ```java |
| LoadBalancingPolicy lbp = cluster.getConfiguration().getPolicies().getLoadBalancingPolicy(); |
| ``` |
| |
| If you don't explicitly configure the policy, you get the default, which is a |
| [datacenter-aware](#dc-aware-round-robin-policy), [token-aware](#token-aware-policy) policy: |
| |
| ```java |
| new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build()); |
| ``` |
| |
| ### Common concepts |
| |
| Before we review the available implementations, we need to introduce two concepts: |
| |
| #### Host distance |
| |
| For each host, the policy computes a **[distance][HostDistance]** that determines how the driver will establish connections |
| to it: |
| |
| * `LOCAL` and `REMOTE` are "active" distances, meaning that the driver will keep open connections to the host. They |
| differ in the number of connections opened, depending on your [pooling options](../pooling/). Also, the |
| [control connection](../control_connection/) will favor local nodes if possible. |
| * `IGNORED`, as the name suggests, means that the driver will not attempt to connect. |
| |
| Typically, the distance will reflect network topology (e.g. local vs. remote datacenter), although that is entirely up |
| to your policy. The distance can be dynamic: the driver re-checks it whenever connection pools are created (e.g. at |
| startup or if a node was down and just came back up); you can also trigger it with [refreshConnectedHosts]: |
| |
| ```java |
| // Re-evaluate all host distances: |
| cluster.getConfiguration().getPoolingOptions().refreshConnectedHosts(); |
| |
| // Re-evaluate the distance for a given host: |
| cluster.getConfiguration().getPoolingOptions().refreshConnectedHost(host); |
| ``` |
| |
| #### Query plan |
| |
| Each time the driver executes a query, it asks the policy to compute a **query plan**, which is a list of hosts. The |
| driver will then try each host in sequence, according to the [retry policy](../retries/) and |
| [speculative execution policy](../speculative_execution). |
| |
| The contents and order of query plans are entirely up to your policy, but implementations typically return plans that: |
| |
| * are different for each query, in order to balance the load across the cluster; |
| * only contain hosts that are known to be able to process queries, i.e. neither ignored nor down; |
| * favor local hosts over remote ones. |
| |
| The next sections describe the implementations that are provided with the driver. |
| |
| |
| ### [RoundRobinPolicy] |
| |
| ```java |
| Cluster cluster = Cluster.builder() |
| .addContactPoint("127.0.0.1") |
| .withLoadBalancingPolicy(new RoundRobinPolicy()) |
| .build(); |
| ``` |
| |
| This is the most straightforward implementation. It returns query plans that include all hosts, and shift for each |
| query in a round-robin fashion. For example: |
| |
| * query 1: host1, host2, host3 |
| * query 2: host2, host3, host1 |
| * query 3: host3, host1, host2 |
| * query 4: host1, host2, host3 |
| * etc. |
| |
| All hosts are at distance `LOCAL`. |
| |
| This works well for simple deployments. If you have multiple datacenters, it will be inefficient and you probably want |
| to switch to a DC-aware policy. |
| |
| |
| ### [DCAwareRoundRobinPolicy] |
| |
| ```java |
| Cluster cluster = Cluster.builder() |
| .addContactPoint("127.0.0.1") |
| .withLoadBalancingPolicy( |
| DCAwareRoundRobinPolicy.builder() |
| .withLocalDc("myLocalDC") |
| .withUsedHostsPerRemoteDc(2) |
| .allowRemoteDCsForLocalConsistencyLevel() |
| .build() |
| ).build(); |
| ``` |
| |
| This policy queries nodes of the local data-center in a round-robin fashion; optionally, it can also try a configurable |
| number of hosts in remote data centers if all local hosts failed. |
| |
| Call `withLocalDc` to specify the name of your local datacenter. You can also leave it out, and the driver will use the |
| datacenter of the first contact point that was reached [at initialization](../#cluster-initialization). However, |
| remember that the driver shuffles the initial list of contact points, so this assumes that all contact points are in the |
| local datacenter. In general, providing the datacenter name explicitly is a safer option. |
| |
| Hosts belonging to the local datacenter are at distance `LOCAL`, and appear first in query plans (in a round-robin |
| fashion). |
| |
| If you call `withUsedHostsPerRemoteDc`, the policy will pick that number of hosts for each remote DC, and add them at |
| the end of query plans. To illustrate this, let's assume that the value is 2, there are 3 datacenters and 3 hosts in the |
| local datacenter. Query plans would look like this: |
| |
| * query 1: localHost1, localHost2, localHost3, host1InRemoteDc1, host2InRemoteDc1, host1InRemoteDc2, host2InRemoteDc2 |
| * query 2: localHost2, localHost3, localHost1, host1InRemoteDc1, host2InRemoteDc1, host1InRemoteDc2, host2InRemoteDc2 |
| * query 3: localHost3, localHost1, localHost2, host1InRemoteDc1, host2InRemoteDc1, host1InRemoteDc2, host2InRemoteDc2 |
| |
| Hosts selected by this option are at distance `REMOTE`. Note that they always appear in the same order. |
| |
| Finally, `allowRemoteDCsForLocalConsistencyLevel` controls whether remote hosts included by the previous option are |
| included when the consistency level of the query is `LOCAL_ONE` or `LOCAL_QUORUM`. By default, it is off (remote hosts |
| are not included for local CLs). |
| |
| |
| ### [TokenAwarePolicy] |
| |
| ```java |
| Cluster cluster = Cluster.builder() |
| .addContactPoint("127.0.0.1") |
| .withLoadBalancingPolicy(new TokenAwarePolicy(anotherPolicy)) |
| .build(); |
| ``` |
| |
| This policy adds **token awareness** on top of another policy: requests will be routed in priority to the local replicas |
| that own the data that is being queried. |
| |
| #### Requirements |
| |
| In order for token awareness to work, you should first ensure that metadata is enabled in the driver. That is the case |
| by default, unless it's been explicitly disabled by [QueryOptions#setMetadataEnabled][setMetadataEnabled]. |
| |
| Then you need to consider whether routing information (provided by [Statement#getKeyspace] and |
| [Statement#getRoutingKey]) can be computed automatically for your statements; if not, you may provide it yourself (if a |
| statement has no routing information, the query will still be executed, but token awareness will not work, so the driver |
| might not pick the best coordinator). |
| |
| The examples assume the following CQL schema: |
| |
| ``` |
| CREATE TABLE testKs.sensor_data(id int, year int, ts timestamp, data double, |
| PRIMARY KEY ((id, year), ts)); |
| ``` |
| |
| For [simple statements](../statements/simple/), routing information can never be computed automatically: |
| |
| ```java |
| SimpleStatement statement = new SimpleStatement( |
| "SELECT * FROM testKs.sensor_data WHERE id = 1 and year = 2016"); |
| |
| // No routing info available: |
| assert statement.getKeyspace() == null; |
| assert statement.getRoutingKey() == null; |
| |
| // Set the keyspace manually: |
| statement.setKeyspace("testKs"); |
| |
| // Set the routing key manually: serialize each partition key component to its target CQL type |
| ProtocolVersion protocolVersion = cluster.getConfiguration().getProtocolOptions().getProtocolVersionEnum(); |
| statement.setRoutingKey( |
| TypeCodec.cint().serialize(1, protocolVersion), |
| TypeCodec.cint().serialize(2016, protocolVersion)); |
| |
| session.execute(statement); |
| ``` |
| |
| For [built statements](../statements/built/), the keyspace is available if it was provided while building the query; the |
| routing key is available only if the statement was built using the table metadata, and all components of the partition |
| key appear in the query: |
| |
| ```java |
| TableMetadata tableMetadata = cluster.getMetadata() |
| .getKeyspace("testKs") |
| .getTable("sensor_data"); |
| |
| // Built from metadata: all info available |
| BuiltStatement statement1 = select().from(tableMetadata) |
| .where(eq("id", 1)) |
| .and(eq("year", 2016)); |
| |
| assert statement1.getKeyspace() != null; |
| assert statement1.getRoutingKey() != null; |
| |
| // Built from keyspace and table name: only keyspace available |
| BuiltStatement statement2 = select().from("testKs", "sensor") |
| .where(eq("id", 1)) |
| .and(eq("year", 2016)); |
| |
| assert statement2.getKeyspace() != null; |
| assert statement2.getRoutingKey() == null; |
| ``` |
| |
| For [bound statements](../statements/prepared/), the keyspace is always available; the routing key is only available if |
| all components of the partition key are bound as variables: |
| |
| ```java |
| // All components bound: all info available |
| PreparedStatement pst1 = session.prepare("SELECT * FROM testKs.sensor_data WHERE id = :id and year = :year"); |
| BoundStatement statement1 = pst1.bind(1, 2016); |
| |
| assert statement1.getKeyspace() != null; |
| assert statement1.getRoutingKey() != null; |
| |
| // 'id' hard-coded, only 'year' is bound: only keyspace available |
| PreparedStatement pst2 = session.prepare("SELECT * FROM testKs.sensor_data WHERE id = 1 and year = :year"); |
| BoundStatement statement2 = pst2.bind(2016); |
| |
| assert statement2.getKeyspace() != null; |
| assert statement2.getRoutingKey() == null; |
| ``` |
| |
| For [batch statements](../statements/batch/), the routing information of each child statement is inspected; the first |
| non-null keyspace is used as the keyspace of the batch, and the first non-null routing key as its routing key (the idea |
| is that all childs should have the same routing information, since batches are supposed to operate on a single |
| partition). All children might have null information, in which case you need to provide the information manually as |
| shown previously. |
| |
| #### Behavior |
| |
| For any host, the distance returned by `TokenAwarePolicy` is always the same as its child policy. |
| |
| When the policy computes a query plan, it will first inspect the statement's routing information. If there is none, the |
| policy simply acts as a pass-through, and returns the query plan computed by its child policy. |
| |
| If the statement has routing information, the policy uses it to determine the replicas that hold the corresponding data. |
| Then it returns a query plan containing: |
| |
| 1. the replicas for which the child policy returns distance `LOCAL`, shuffled in a random order; |
| 2. followed by the query plan of the child policy, skipping any host that were already returned by the previous step. |
| |
| Finally, the `shuffleReplicas` constructor parameter allows you to control whether the policy shuffles the replicas in |
| step 1: |
| |
| ```java |
| new TokenAwarePolicy(anotherPolicy, false); // no shuffling |
| ``` |
| |
| Shuffling will distribute writes better, and can alleviate hotspots caused by "fat" partitions. On the other hand, |
| setting it to `false` might increase the effectiveness of caching, since data will always be retrieved from the |
| "primary" replica. Shuffling is enabled by default. |
| |
| ### [LatencyAwarePolicy] |
| |
| ```java |
| Cluster cluster = Cluster.builder() |
| .addContactPoint("127.0.0.1") |
| .withLoadBalancingPolicy( |
| LatencyAwarePolicy.builder(anotherPolicy) |
| .withExclusionThreshold(2.0) |
| .withScale(100, TimeUnit.MILLISECONDS) |
| .withRetryPeriod(10, TimeUnit.SECONDS) |
| .withUpdateRate(100, TimeUnit.MILLISECONDS) |
| .withMininumMeasurements(50) |
| .build() |
| ).build(); |
| ``` |
| |
| This policy adds **latency awareness** on top of another policy: it collects the latencies of queries to each host, and |
| will exclude the worst-performing hosts from query plans. |
| |
| The builder allow you to customize various aspects of the policy: |
| |
| * the [exclusion threshold][withExclusionThreshold] controls how much worse a host must perform (compared to the fastest |
| host) in order to be excluded. For example, 2 means that hosts that are twice slower will be excluded; |
| * since a host's performance can vary over time, its score is computed with a time-weighted average; the |
| [scale][withScale] controls how fast the weight given to older latencies decreases over time; |
| * the [retry period][withRetryPeriod] is the duration for which a slow host will be penalized; |
| * the [update rate][withUpdateRate] defines how often the minimum average latency (i.e. the fastest host) is recomputed; |
| * the [minimum measurements][withMininumMeasurements] threshold guarantees that we have enough measurements before we |
| start excluding a host. This prevents skewing the measurements during a node restart, where JVM warm-up will influence |
| latencies. |
| |
| For any host, the distance returned by the policy is always the same as its child policy. |
| |
| Query plans are based on the child policy's, except that hosts that are currently excluded for being too slow are moved |
| to the end of the plan. |
| |
| [withExclusionThreshold]: http://docs.datastax.com/en/drivers/java/3.0/com/datastax/driver/core/policies/LatencyAwarePolicy.Builder.html#withExclusionThreshold-double- |
| [withMininumMeasurements]: http://docs.datastax.com/en/drivers/java/3.0/com/datastax/driver/core/policies/LatencyAwarePolicy.Builder.html#withMininumMeasurements-int- |
| [withRetryPeriod]: http://docs.datastax.com/en/drivers/java/3.0/com/datastax/driver/core/policies/LatencyAwarePolicy.Builder.html#withRetryPeriod-long-java.util.concurrent.TimeUnit- |
| [withScale]: http://docs.datastax.com/en/drivers/java/3.0/com/datastax/driver/core/policies/LatencyAwarePolicy.Builder.html#withScale-long-java.util.concurrent.TimeUnit- |
| [withUpdateRate]: http://docs.datastax.com/en/drivers/java/3.0/com/datastax/driver/core/policies/LatencyAwarePolicy.Builder.html#withUpdateRate-long-java.util.concurrent.TimeUnit- |
| |
| ### Filtering policies |
| |
| [WhiteListPolicy] wraps another policy with a white list, to ensure that the driver will only ever connect to a |
| pre-defined subset of the cluster. The distance will be that of the child policy for hosts that are in the white list, |
| and `IGNORED` otherwise. Query plans are guaranteed to only contain white-listed hosts. |
| |
| [HostFilterPolicy] is a generalization of that concept, where you provide the predicate that will determine if a host is |
| included or not. |
| |
| ### Implementing your own |
| |
| If none of the provided policies fit your use case, you can write your own. This is an advanced topic, so we recommend |
| studying the existing implementations first: `RoundRobinPolicy` is a good place to start, then you can look at more |
| complex ones like `DCAwareRoundRobinPolicy`. |
| |
| |
| [LoadBalancingPolicy]: http://docs.datastax.com/en/drivers/java/3.0/com/datastax/driver/core/policies/LoadBalancingPolicy.html |
| [RoundRobinPolicy]: http://docs.datastax.com/en/drivers/java/3.0/com/datastax/driver/core/policies/RoundRobinPolicy.html |
| [DCAwareRoundRobinPolicy]: http://docs.datastax.com/en/drivers/java/3.0/com/datastax/driver/core/policies/DCAwareRoundRobinPolicy.html |
| [TokenAwarePolicy]: http://docs.datastax.com/en/drivers/java/3.0/com/datastax/driver/core/policies/TokenAwarePolicy.html |
| [LatencyAwarePolicy]: http://docs.datastax.com/en/drivers/java/3.0/com/datastax/driver/core/policies/LatencyAwarePolicy.html |
| [HostFilterPolicy]: http://docs.datastax.com/en/drivers/java/3.0/com/datastax/driver/core/policies/HostFilterPolicy.html |
| [WhiteListPolicy]: http://docs.datastax.com/en/drivers/java/3.0/com/datastax/driver/core/policies/WhiteListPolicy.html |
| [HostDistance]: http://docs.datastax.com/en/drivers/java/3.0/com/datastax/driver/core/HostDistance.html |
| [refreshConnectedHosts]: http://docs.datastax.com/en/drivers/java/3.0/com/datastax/driver/core/PoolingOptions.html#refreshConnectedHosts-- |
| [setMetadataEnabled]: http://docs.datastax.com/en/drivers/java/3.0/com/datastax/driver/core/QueryOptions.html#setMetadataEnabled-boolean- |
| [Statement#getKeyspace]: http://docs.datastax.com/en/drivers/java/3.0/com/datastax/driver/core/Statement.html#getKeyspace-- |
| [Statement#getRoutingKey]: http://docs.datastax.com/en/drivers/java/3.0/com/datastax/driver/core/Statement.html#getRoutingKey-- |