| = Improved Internode Messaging |
| |
| Apache Cassandra 4.0 has added several new improvements to internode |
| messaging. |
| |
| == Optimized Internode Messaging Protocol |
| |
| The internode messaging protocol has been optimized |
| (https://issues.apache.org/jira/browse/CASSANDRA-14485[CASSANDRA-14485]). |
| Previously the `IPAddressAndPort` of the sender was included with each |
| message that was sent even though the `IPAddressAndPort` had already |
| been sent once when the initial connection/session was established. In |
| Cassandra 4.0 `IPAddressAndPort` has been removed from every separate |
| message sent and only sent when connection/session is initiated. |
| |
| Another improvement is that at several instances (listed) a fixed 4-byte |
| integer value has been replaced with `vint` as a `vint` is almost always |
| less than 1 byte: |
| |
| * The `paramSize` (the number of parameters in the header) |
| * Each individual parameter value |
| * The `payloadSize` |
| |
| == NIO Messaging |
| |
| In Cassandra 4.0 peer-to-peer (internode) messaging has been switched to |
| non-blocking I/O (NIO) via Netty |
| (https://issues.apache.org/jira/browse/CASSANDRA-8457[CASSANDRA-8457]). |
| |
| As serialization format, each message contains a header with several |
| fixed fields, an optional key-value parameters section, and then the |
| message payload itself. Note: the IP address in the header may be either |
| IPv4 (4 bytes) or IPv6 (16 bytes). |
| |
| ____ |
| The diagram below shows the IPv4 address for brevity. |
| ____ |
| |
| .... |
| 1 1 1 1 1 2 2 2 2 2 3 3 3 3 3 4 4 4 4 4 5 5 5 5 5 6 6 |
| 0 2 4 6 8 0 2 4 6 8 0 2 4 6 8 0 2 4 6 8 0 2 4 6 8 0 2 4 6 8 0 2 |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| | PROTOCOL MAGIC | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| | Message ID | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| | Timestamp | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| | Addr len | IP Address (IPv4) / |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| / | Verb / |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| / | Parameters size / |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| / | Parameter data / |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| / | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| | Payload size | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| | / |
| / Payload / |
| / | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| .... |
| |
| An individual parameter has a String key and a byte array value. The key |
| is serialized with its length, encoded as two bytes, followed by the |
| UTF-8 byte encoding of the string. The body is serialized with its |
| length, encoded as four bytes, followed by the bytes of the value. |
| |
| == Resource limits on Queued Messages |
| |
| System stability is improved by enforcing strict resource limits |
| (https://issues.apache.org/jira/browse/CASSANDRA-15066[CASSANDRA-15066]) |
| on the number of outbound messages that are queued, measured by the |
| `serializedSize` of the message. There are three separate limits imposed |
| simultaneously to ensure that progress is always made without any |
| reasonable combination of failures impacting a node’s stability. |
| |
| [arabic] |
| . Global, per-endpoint and per-connection limits are imposed on messages |
| queued for delivery to other nodes and waiting to be processed on |
| arrival from other nodes in the cluster. These limits are applied to the |
| on-wire size of the message being sent or received. |
| . The basic per-link limit is consumed in isolation before any endpoint |
| or global limit is imposed. Each node-pair has three links: urgent, |
| small and large. So any given node may have a maximum of |
| `N*3 * (internode_application_send_queue_capacity_in_bytes + internode_application_receive_queue_capacity_in_bytes)` |
| messages queued without any coordination between them although in |
| practice, with token-aware routing, only RF*tokens nodes should need to |
| communicate with significant bandwidth. |
| . The per-endpoint limit is imposed on all messages exceeding the |
| per-link limit, simultaneously with the global limit, on all links to or |
| from a single node in the cluster. The global limit is imposed on all |
| messages exceeding the per-link limit, simultaneously with the |
| per-endpoint limit, on all links to or from any node in the cluster. The |
| following configuration settings have been added to `cassandra.yaml` for |
| resource limits on queued messages. |
| |
| .... |
| internode_application_send_queue_capacity_in_bytes: 4194304 #4MiB |
| internode_application_send_queue_reserve_endpoint_capacity_in_bytes: 134217728 #128MiB |
| internode_application_send_queue_reserve_global_capacity_in_bytes: 536870912 #512MiB |
| internode_application_receive_queue_capacity_in_bytes: 4194304 #4MiB |
| internode_application_receive_queue_reserve_endpoint_capacity_in_bytes: 134217728 #128MiB |
| internode_application_receive_queue_reserve_global_capacity_in_bytes: 536870912 #512MiB |
| .... |
| |
| == Virtual Tables for Messaging Metrics |
| |
| Metrics is improved by keeping metrics using virtual tables for |
| inter-node inbound and outbound messaging |
| (https://issues.apache.org/jira/browse/CASSANDRA-15066[CASSANDRA-15066]). |
| For inbound messaging a virtual table (`internode_inbound`) has been |
| added to keep metrics for: |
| |
| * Bytes and count of messages that could not be serialized or flushed |
| due to an error |
| * Bytes and count of messages scheduled |
| * Bytes and count of messages successfully processed |
| * Bytes and count of messages successfully received |
| * Nanos and count of messages throttled |
| * Bytes and count of messages expired |
| * Corrupt frames recovered and unrecovered |
| |
| A separate virtual table (`internode_outbound`) has been added for |
| outbound inter-node messaging. The outbound virtual table keeps metrics |
| for: |
| |
| * Bytes and count of messages pending |
| * Bytes and count of messages sent |
| * Bytes and count of messages expired |
| * Bytes and count of messages that could not be sent due to an error |
| * Bytes and count of messages overloaded |
| * Active Connection Count |
| * Connection Attempts |
| * Successful Connection Attempts |
| |
| == Hint Messaging |
| |
| A specialized version of hint message that takes an already encoded in a |
| `ByteBuffer` hint and sends it verbatim has been added. It is an |
| optimization for when dispatching a hint file of the current messaging |
| version to a node of the same messaging version, which is the most |
| common case. It saves on extra `ByteBuffer` allocations one redundant |
| hint deserialization-serialization cycle. |
| |
| == Internode Application Timeout |
| |
| A configuration setting has been added to `cassandra.yaml` for the |
| maximum continuous period a connection may be unwritable in application |
| space. |
| |
| .... |
| # internode_application_timeout_in_ms = 30000 |
| .... |
| |
| Some other new features include logging of message size to trace message |
| for tracing a query. |
| |
| == Paxos prepare and propose stage for local requests optimized |
| |
| In pre-4.0 Paxos prepare and propose messages always go through entire |
| `MessagingService` stack in Cassandra even if request is to be served |
| locally, we can enhance and make local requests severed w/o involving |
| `MessagingService`. Similar things are done elsewhere in Cassandra which |
| skips `MessagingService` stage for local requests. |
| |
| This is what it looks like in pre 4.0 if we have tracing on and run a |
| light-weight transaction: |
| |
| .... |
| Sending PAXOS_PREPARE message to /A.B.C.D [MessagingService-Outgoing-/A.B.C.D] | 2017-09-11 |
| 21:55:18.971000 | A.B.C.D | 15045 |
| … REQUEST_RESPONSE message received from /A.B.C.D [MessagingService-Incoming-/A.B.C.D] | |
| 2017-09-11 21:55:18.976000 | A.B.C.D | 20270 |
| … Processing response from /A.B.C.D [SharedPool-Worker-4] | 2017-09-11 21:55:18.976000 | |
| A.B.C.D | 20372 |
| .... |
| |
| Same thing applies for Propose stage as well. |
| |
| In version 4.0 Paxos prepare and propose stage for local requests are |
| optimized |
| (https://issues.apache.org/jira/browse/CASSANDRA-13862[CASSANDRA-13862]). |
| |
| == Quality Assurance |
| |
| Several other quality assurance improvements have been made in version |
| 4.0 |
| (https://issues.apache.org/jira/browse/CASSANDRA-15066[CASSANDRA-15066]). |
| |
| === Framing |
| |
| Version 4.0 introduces framing to all internode messages, i.e. the |
| grouping of messages into a single logical payload with headers and |
| trailers; these frames are guaranteed to either contain at most one |
| message, that is split into its own unique sequence of frames (for large |
| messages), or that a frame contains only complete messages. |
| |
| === Corruption prevention |
| |
| Previously, intra-datacenter internode messages would be unprotected |
| from corruption by default, as only LZ4 provided any integrity checks. |
| All messages to post 4.0 nodes are written to explicit frames, which may |
| be: |
| |
| * LZ4 encoded |
| * CRC protected |
| |
| The Unprotected option is still available. |
| |
| === Resilience |
| |
| For resilience, all frames are written with a separate CRC protected |
| header, of 8 and 6 bytes respectively. If corruption occurs in this |
| header, the connection must be reset, as before. If corruption occurs |
| anywhere outside of the header, the corrupt frame will be skipped, |
| leaving the connection intact and avoiding the loss of any messages |
| unnecessarily. |
| |
| Previously, any issue at any point in the stream would result in the |
| connection being reset, with the loss of any in-flight messages. |
| |
| === Efficiency |
| |
| The overall memory usage, and number of byte shuffles, on both inbound |
| and outbound messages is reduced. |
| |
| Outbound the Netty LZ4 encoder maintains a chunk size buffer (64KiB), |
| that is filled before any compressed frame can be produced. Our frame |
| encoders avoid this redundant copy, as well as freeing 192KiB per |
| endpoint. |
| |
| Inbound, frame decoders guarantee only to copy the number of bytes |
| necessary to parse a frame, and to never store more bytes than |
| necessary. This improvement applies twice to LZ4 connections, improving |
| both the message decode and the LZ4 frame decode. |
| |
| === Inbound Path |
| |
| Version 4.0 introduces several improvements to the inbound path. |
| |
| An appropriate message handler is used based on whether large or small |
| messages are expected on a particular connection as set in a flag. |
| `NonblockingBufferHandler`, running on event loop, is used for small |
| messages, and `BlockingBufferHandler`, running off event loop, for large |
| messages. The single implementation of `InboundMessageHandler` handles |
| messages of any size effectively by deriving size of the incoming |
| message from the byte stream. In addition to deriving size of the |
| message from the stream, incoming message expiration time is proactively |
| read, before attempting to deserialize the entire message. If it’s |
| expired at the time when a message is encountered the message is just |
| skipped in the byte stream altogether. And if a message fails to be |
| deserialized while still on the receiving side - say, because of table |
| id or column being unknown - bytes are skipped, without dropping the |
| entire connection and losing all the buffered messages. An immediately |
| reply back is sent to the coordinator node with the failure reason, |
| rather than waiting for the coordinator callback to expire. This logic |
| is extended to a corrupted frame; a corrupted frame is safely skipped |
| over without dropping the connection. |
| |
| Inbound path imposes strict limits on memory utilization. Specifically, |
| the memory occupied by all parsed, but unprocessed messages is bound - |
| on per-connection, per-endpoint, and global basis. Once a connection |
| exceeds its local unprocessed capacity and cannot borrow any permits |
| from per-endpoint and global reserve, it simply stops processing further |
| messages, providing natural backpressure - until sufficient capacity is |
| regained. |
| |
| === Outbound Connections |
| |
| ==== Opening a connection |
| |
| A consistent approach is adopted for all kinds of failure to connect, |
| including: refused by endpoint, incompatible versions, or unexpected |
| exceptions; |
| |
| * Retry forever, until either success or no messages waiting to deliver. |
| * Wait incrementally longer periods before reconnecting, up to a maximum |
| of 1s. |
| * While failing to connect, no reserve queue limits are acquired. |
| |
| ==== Closing a connection |
| |
| * Correctly drains outbound messages that are waiting to be delivered |
| (unless disconnected and fail to reconnect). |
| * Messages written to a closing connection are either delivered or |
| rejected, with a new connection being opened if the old is irrevocably |
| closed. |
| * Unused connections are pruned eventually. |
| |
| ==== Reconnecting |
| |
| We sometimes need to reconnect a perfectly valid connection, e.g. if the |
| preferred IP address changes. We ensure that the underlying connection |
| has no in-progress operations before closing it and reconnecting. |
| |
| ==== Message Failure |
| |
| Propagates to callbacks instantly, better preventing overload by |
| reclaiming committed memory. |
| |
| ===== Expiry |
| |
| * No longer experiences head-of-line blocking (e.g. undroppable message |
| preventing all droppable messages from being expired). |
| * While overloaded, expiry is attempted eagerly on enqueuing threads. |
| * While disconnected we schedule regular pruning, to handle the case |
| where messages are no longer being sent, but we have a large backlog to |
| expire. |
| |
| ===== Overload |
| |
| * Tracked by bytes queued, as opposed to number of messages. |
| |
| ===== Serialization Errors |
| |
| * Do not result in the connection being invalidated; the message is |
| simply completed with failure, and then erased from the frame. |
| * Includes detected mismatch between calculated serialization size to |
| actual. |
| |
| Failures to flush to network, perhaps because the connection has been |
| reset are not currently notified to callback handlers, as the necessary |
| information has been discarded, though it would be possible to do so in |
| future if we decide it is worth our while. |
| |
| ==== QoS |
| |
| "Gossip" connection has been replaced with a general purpose "Urgent" |
| connection, for any small messages impacting system stability. |
| |
| ==== Metrics |
| |
| We track, and expose via Virtual Table and JMX, the number of messages |
| and bytes that: we could not serialize or flush due to an error, we |
| dropped due to overload or timeout, are pending, and have successfully |
| sent. |
| |
| == Added a Message size limit |
| |
| Cassandra pre-4.0 doesn't protect the server from allocating huge |
| buffers for the inter-node Message objects. Adding a message size limit |
| would be good to deal with issues such as a malfunctioning cluster |
| participant. Version 4.0 introduced max message size config param, akin |
| to max mutation size - set to endpoint reserve capacity by default. |
| |
| == Recover from unknown table when deserializing internode messages |
| |
| As discussed in |
| (https://issues.apache.org/jira/browse/CASSANDRA-9289[CASSANDRA-9289]) |
| it would be nice to gracefully recover from seeing an unknown table in a |
| message from another node. Pre-4.0, we close the connection and |
| reconnect, which can cause other concurrent queries to fail. Version 4.0 |
| fixes the issue by wrapping message in-stream with |
| `TrackedDataInputPlus`, catching `UnknownCFException`, and skipping the |
| remaining bytes in this message. TCP won't be closed and it will remain |
| connected for other messages. |