| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| --> |
| |
| ## Speculative query execution |
| |
| ### Quick overview |
| |
| Pre-emptively query another node if the current one takes too long to respond. |
| |
| * `advanced.speculative-execution-policy` in the configuration. |
| * disabled by default. Also available: constant delay, or write your own policy. |
| * can have per-profile policies. |
| * only kicks in if the query is idempotent. |
| * creates more traffic: tune your pool and provision your cluster accordingly. |
| |
| ----- |
| |
| Sometimes a Cassandra node might be experiencing difficulties (ex: long GC pause) and take longer |
| than usual to reply. Queries sent to that node will experience bad latency. |
| |
| One thing we can do to improve that is pre-emptively start a second execution of the query against |
| another node, before the first node has replied or errored out. If that second node replies faster, |
| we can send the response back to the client. We also cancel the first execution (note that |
| "cancelling" in this context simply means discarding the response when it arrives later, Cassandra |
| does not support cancellation of in flight requests): |
| |
| ```ditaa |
| client driver exec1 exec2 |
| --+----------------+--------------+------+--- |
| | execute(query) | |
| |--------------->| |
| | | query node1 |
| | |------------->| |
| | | | |
| | | | |
| | | query node2 |
| | |-------------------->| |
| | | | | |
| | | | | |
| | | node2 replies | |
| | |<--------------------| |
| | complete | | |
| |<---------------| | |
| | | cancel | |
| | |------------->| |
| ``` |
| |
| Or the first node could reply just after the second execution was started. In this case, we cancel |
| the second execution. In other words, whichever node replies faster "wins" and completes the client |
| query: |
| |
| ```ditaa |
| client driver exec1 exec2 |
| --+----------------+--------------+------+--- |
| | execute(query) | |
| |--------------->| |
| | | query node1 |
| | |------------->| |
| | | | |
| | | | |
| | | query node2 |
| | |-------------------->| |
| | | | | |
| | | | | |
| | | node1 replies| | |
| | |<-------------| | |
| | complete | | |
| |<---------------| | |
| | | cancel | |
| | |-------------------->| |
| ``` |
| |
| Speculative executions are **disabled** by default. The following sections cover the practical |
| details and how to enable them. |
| |
| ### Query idempotence |
| |
| If a query is [not idempotent](../idempotence/), the driver will never schedule speculative |
| executions for it, because there is no way to guarantee that only one node will apply the mutation. |
| |
| ### Configuration |
| |
| Speculative executions are controlled by a policy defined in the [configuration](../configuration/). |
| The default implementation never schedules an execution: |
| |
| ``` |
| datastax-java-driver.advanced.speculative-execution-policy { |
| class = NoSpeculativeExecutionPolicy |
| } |
| ``` |
| |
| The "constant" policy schedules executions at a fixed delay: |
| |
| ``` |
| datastax-java-driver.advanced.speculative-execution-policy { |
| class = ConstantSpeculativeExecutionPolicy |
| |
| # The maximum number of executions (including the initial, non-speculative execution). |
| # This must be at least one. |
| max-executions = 3 |
| |
| # The delay between each execution. 0 is allowed, and will result in all executions being sent |
| # simultaneously when the request starts. |
| # Note that sub-millisecond precision is not supported, any excess precision information will |
| # be dropped; in particular, delays of less than 1 millisecond are equivalent to 0. |
| # This must be positive or 0. |
| delay = 100 milliseconds |
| } |
| ``` |
| |
| Given the above configuration, an idempotent query would be handled this way: |
| |
| * start the initial execution at t0; |
| * if no response has been received at t0 + 100 milliseconds, start a speculative execution on |
| another node; |
| * if no response has been received at t0 + 200 milliseconds, start another speculative execution on |
| a third node; |
| * past that point, don't query other nodes, just wait for the first response to arrive. |
| |
| Finally, you can create your own policy by implementing [SpeculativeExecutionPolicy], and |
| referencing your implementation class from the configuration. |
| |
| ### How speculative executions affect retries |
| |
| Turning on speculative executions doesn't change the driver's [retry](../retries/) behavior. Each |
| parallel execution will trigger retries independently: |
| |
| ```ditaa |
| client driver exec1 exec2 |
| --+----------------+--------------+------+--- |
| | execute(query) | |
| |--------------->| |
| | | query node1 |
| | |------------->| |
| | | | |
| | | unavailable | |
| | |<-------------| |
| | | |
| | |retry at lower CL |
| | |------------->| |
| | | | |
| | | query node2 |
| | |-------------------->| |
| | | | | |
| | | server error | |
| | |<--------------------| |
| | | | |
| | | retry on node3 |
| | |-------------------->| |
| | | | | |
| | | node1 replies| | |
| | |<-------------| | |
| | complete | | |
| |<---------------| | |
| | | cancel | |
| | |-------------------->| |
| ``` |
| |
| The only impact is that all executions of the same query always share the same query plan, so each |
| node will be used by at most one execution. |
| |
| ### Tuning and practical details |
| |
| The goal of speculative executions is to improve overall latency (the time between `execute(query)` |
| and `complete` in the diagrams above) at high percentiles. On the flip side, too many speculative |
| executions increase the pressure on the cluster. |
| |
| If you use speculative executions to avoid unhealthy nodes, a good-behaving node should rarely hit |
| the threshold. We recommend running a benchmark on a healthy platform (all nodes up and healthy) and |
| monitoring the request percentiles with the `cql-requests` [metric](../metrics/). Then use the |
| latency at a high percentile (for example p99.9) as the threshold. |
| |
| Alternatively, maybe low latency is your absolute priority, and you are willing to take the |
| increased throughput as a tradeoff. In that case, set the threshold to 0 and provision your cluster |
| accordingly. |
| |
| You can monitor the number of speculative executions triggered by each node with the |
| `speculative-executions` [metric](../metrics/). |
| |
| #### Stream id exhaustion |
| |
| One side-effect of speculative executions is that many requests get cancelled, which can lead to a |
| phenomenon called *stream id exhaustion*: each TCP connection can handle multiple simultaneous |
| requests, identified by a unique number called *stream id* (see also the [pooling](../pooling/) |
| section). When a request gets cancelled, we can't reuse its stream id immediately because we might |
| still receive a response from the server later. If this happens often, the number of available |
| stream ids diminishes over time, and when it goes below a given threshold we close the connection |
| and create a new one. If requests are often cancelled, you will see connections being recycled at a |
| high rate. |
| |
| The best way to monitor this is to compare the `pool.orphaned-streams` [metric](../metrics/) to the |
| total number of available stream ids (which can be computed from the configuration: |
| `pool.local.size * max-requests-per-connection`). The `pool.available-streams` and `pool.in-flight` |
| metrics will also give you an idea of how many stream ids are left for active queries. |
| |
| #### Request ordering |
| |
| Note: ordering issues are only a problem with [server-side timestamps](../query_timestamps/), which |
| are not the default anymore in driver 4+. So unless you've explicitly enabled |
| `ServerSideTimestampGenerator`, you can skip this section. |
| |
| Suppose you run the following query with speculative executions and server-side timestamps enabled: |
| |
| insert into my_table (k, v) values (1, 1); |
| |
| The first execution is a bit too slow, so a second execution gets triggered. Finally, the first |
| execution completes, so the client code gets back an acknowledgement, and the second execution is |
| cancelled. However, cancelling only means that the driver stops waiting for the server's response, |
| the request could still be "on the wire"; let's assume that this is the case. |
| |
| Now you run the following query, which completes successfully: |
| |
| delete from my_table where k = 1; |
| |
| But now the second execution of the first query finally reaches its target node, which applies the |
| mutation. The row that you've just deleted is back! |
| |
| The workaround is to either specify a timestamp in your CQL queries: |
| |
| insert into my_table (k, v) values (1, 1) USING TIMESTAMP 1432764000; |
| |
| Or use a client-side [timestamp generator](../query_timestamps/). |
| |
| ### Using multiple policies |
| |
| The speculative execution policy can be overridden in [execution |
| profiles](../configuration/#profiles): |
| |
| ``` |
| datastax-java-driver { |
| advanced.speculative-execution-policy { |
| class = ConstantSpeculativeExecutionPolicy |
| max-executions = 3 |
| delay = 100 milliseconds |
| } |
| profiles { |
| oltp { |
| basic.request.timeout = 100 milliseconds |
| } |
| olap { |
| basic.request.timeout = 30 seconds |
| advanced.speculative-execution-policy.class = NoSpeculativeExecutionPolicy |
| } |
| } |
| } |
| ``` |
| |
| The `olap` profile uses its own policy. The `oltp` profile inherits the default profile's. Note that |
| this goes beyond configuration inheritance: the driver only creates a single |
| `ConstantSpeculativeExecutionPolicy` instance and reuses it (this also occurs if two sibling |
| profiles have the same configuration). |
| |
| Each request uses its declared profile's policy. If it doesn't declare any profile, or if the |
| profile doesn't have a dedicated policy, then the default profile's policy is used. |
| |
| [SpeculativeExecutionPolicy]: https://docs.datastax.com/en/drivers/java/4.17/com/datastax/oss/driver/api/core/specex/SpeculativeExecutionPolicy.html |