Aside from the main task of executing user requests, the driver also needs to track cluster state and metadata. This is done with a number of administrative components:
+---------------+ | DriverChannel | +-------+-------+ |1 | topology +-----------------+ query +---------+---------+ events | TopologyMonitor +------+---->| ControlConnection +-----------------+ +-----------------+ | +---------+---------+ | ^ | | | | | | topology+channel V get | +---------+ refresh| events +----------+ node info| | schema | +------------+ EventBus | | | | | +-+--------+ +--------+-----+--+ | | ^ ^ | MetadataManager |<-------+-------------+ | node| | +--------+-------++ | | state| | | | | add/remove v events| | |1 | | node +------------------+ | | +-----+----+ | +------------+ NodeStateManager +------+ | | Metadata | | +------------------+ | +----------+ | | +-------------------------------------------------------+ metadata changed events
Note: the event bus is covered in the common infrastructure section.
The goal of the control connection is to maintain a dedicated DriverChannel
instance, used to:
NEW_NODE
, REMOVED_NODE
) and status events (UP
, DOWN
) are published on the event bus, to be processed by other components;system.local
and system.peers
;system_schema.*
.It has its own reconnection mechanism (if the channel goes down, a new one will be opened to another node in the cluster) and some logic for initialization and shutdown.
Note that the control connection is really just an implementation detail of the metadata manager and topology monitor: if those components are overridden with custom versions that use other means to get their data, the driver will detect it and not initialize the control connection (at the time of writing, the session also references the control connection directly, but that's a bug: JAVA-2473).
This component is responsible for maintaining the contents of session.getMetadata().
One big improvement in driver 4 is that the Metadata
object is immutable and updated atomically; this guarantees a consistent view of the cluster at a given point in time. For example, if a keyspace name is referenced in the token map, there will always be a corresponding KeyspaceMetadata
in the schema metadata.
MetadataManager
keeps the current Metadata
instance in a volatile field. Each transition is managed by a MetadataRefresh
object that computes the new metadata, along with an optional list of events to publish on the bus (e.g. table created, keyspace removed, etc.) The new metadata is then written back to the volatile field. MetadataManager
follows the confined inner class pattern to ensure that all refreshes are applied serially, from a single admin thread. This guarantees that two refreshes can't start from the same initial state and overwrite each other.
There are various types of refreshes targeting nodes, the schema or the token map.
Note that, unlike driver 3, we only do full schema refreshes. This simplifies the code considerably, and thanks to debouncing this should not affect performance. The schema refresh process uses a few auxiliary components that may have different implementations depending on the Cassandra version:
SchemaQueries
: launches the schema queries asynchronously, and assemble the result in a SchemaRows
;SchemaParser
: turns the SchemaRows
into the SchemaRefresh
.When the metadata manager needs node-related data, it queries the topology monitor. When it needs schema-related data, it uses the control connection directly to issue its queries.
TopologyMonitor
abstracts how we get information about nodes in the cluster:
TopologyEvent
instances on the bus when we get external signals suggesting topology changes (node added or removed), or status changes (node down or up).The built-in implementation uses the control connection to query system.local
and system.peers
, and listen to gossip events.
NodeStateManager
tracks the state of the nodes in the cluster.
We can't simply trust gossip events because they are not always reliable (the coordinator can become isolated and think other nodes are down). Instead, the driver uses more elaborate rules that combine external signals with observed internal state:
See the javadocs of NodeState
and TopologyEvent
, as well as the NodeStateManager
implementation itself, for more details.
These two event types are related, but they're used at different stages:
TopologyEvent
is an external signal about the state of a node (by default, a TOPOLOGY_CHANGE
or STATUS_CHANGE
gossip event received on the control connection). This is considered as a mere suggestion, that the driver may or may not decide to follow;NodeStateEvent
is an actual decision made by the driver to change a node to a given state.NodeStateManager
essentially transforms topology events, as well as other internal signals, into node state events.
In general, other driver components only react to node state events, but there are a few exceptions: for example, if a connection pool is reconnecting and the next attempt is scheduled in 5 minutes, but a SUGGEST_UP topology event is emitted, the pool tries to reconnect immediately.
The best way to find where each event is used is to do a usage search of the event type.
Most changes to the cluster state will involve the coordinated effort of multiple admin components. Here are a few examples:
+-----------------+ +--------+ +----------------+ +---------------+ +---------------+ |ControlConnection| |EventBus| |NodeStateManager| |MetadataManager| |TopologyMonitor| +--------+--------+ +---+----+ +--------+-------+ +-------+-------+ +-------+-------+ | | | | | +--------+-------+ | | | | |Receive NEW_NODE| | | | | |gossip event | | | | | | {d}| | | | | +--------+-------+ | | | | | | | | | |TopologyEvent( | | | | | SUGGEST_ADDED)| | | | +--------------->| | | | | |onTopologyEvent| | | | +-------------->| | | | | +------+-------+ | | | | |check node not| | | | | |known already | | | | | | {d}| | | | | +------+-------+ | | | | | | | | | | addNode | | | | +---------------->| | | | | | getNewNodeInfo | | | | +---------------->| | | | | | | query(SELECT FROM system.peers) | |<-------------------------------------------------------------------+ +------------------------------------------------------------------->| | | | |<----------------+ | | | +-------+--------+ | | | | |create and apply| | | | | |AddNodeRefresh | | | | | | {d}| | | | | +-------+--------+ | | | | | | | | NodeChangeEvent(ADDED) | | | |<--------------------------------+ | | | | | |
At this point, other driver components listening on the event bus will get notified of the addition. For example, DefaultSession
will initialize a connection pool to the new node.
+-----------------+ +---------------+ +---------------+ +--------+ |ControlConnection| |MetadataManager| |TopologyMonitor| |EventBus| +--------+--------+ +-------+-------+ +-------+-------+ +---+----+ | | | | +----------+----------+ | | | |Receive SCHEMA_CHANGE| | | | |gossip event | | | | | {d} | | | | +----------+----------+ | | | | | | | | refreshSchema | | | +------------------------------->| | | | |checkSchemaAgreement | | | +-------------------->| | | | | | | query(SELECT FROM system.local/peers) | | |<-----------------------------------------------------+ | +----------------------------------------------------->| | | | | | | |<--------------------+ | |query(SELECT FROM system_schema)| | | |<-------------------------------+ | | +------------------------------->| | | | +-------+--------+ | | | |Parse results | | | | |Create and apply| | | | |SchemaRefresh | | | | | {d}| | | | +-------+--------+ | | | | | | | | TableChangeEvent(CREATED) | | +---------------------------------->| | | | |
+-----------+ +--------+ +----------------+ +----+ +---------------+ |ChannelPool| |EventBus| |NodeStateManager| |Node| |MetadataManager| +-----+-----+ +---+----+ +-------+--------+ +-+--+ +-------+-------+ | | | | | |ChannelEvent(CLOSED) | | | | +----------------------->| | | | | |onChannelEvent | | | +------+-----+ +--------------->| | | | start | | |decrement | | |reconnecting| | |openConnections | | | {d}| | +--------------->| | +------+-----+ | | | | |ChannelEvent( | | | | | RECONNECTION_STARTED) | | | | +----------------------->| | | | | |onChannelEvent | | | | +--------------->| | | | | |increment | | | | |reconnections | | | | +--------------->| | | | | | | | | +--------+--------+ | | | | |detect node has | | | | | |0 connections and| | | | | |is reconnecting | | | | | | {d} | | | | | +--------+--------+ | | | | |set state DOWN | | | | +--------------->| | | |NodeStateEvent( | | | | | DOWN) | | | +------+-----+ |<---------------+ | | |reconnection| | | | | | succeeds | | | | | | {d}| | | | | +------+-----+ | | | | |ChannelEvent(OPENED) | | | | +----------------------->| | | | | |onChannelEvent | | | | +--------------->| | | | | |increment | | | | |openConnections | | | | +--------------->| | | | | | | | | +--------+--------+ | | | | |detect node has | | | | | |1 connection | | | | | | {d} | | | | | +--------+--------+ | | | | | refreshNode | | | | +---------------------------->| | | | | | | | |set state UP | | | | +--------------->| | | |NodeStateEvent( | | | | | UP) | | | | |<---------------+ | | |ChannelEvent( | | | | | RECONNECTION_STOPPED) | | | | +----------------------->| | | | | |onChannelEvent | | | | +--------------->| | | | | |decrement | | | | |reconnections | | | | +--------------->| | | | | | |
This is a standalone component because some users have asked for a way to use their own discovery service instead of relying on system tables and gossip (see JAVA-1082).
A custom implementation can be plugged by extending the context and overriding buildTopologyMonitor
. It should:
TopologyMonitor
by querying the discovery service;TopologyEvent
instances on the bus.Read the javadocs for more details; in particular, NodeInfo
explains how the driver uses the information returned by the topology monitor.
It's less likely that this will be overridden directly. But the schema querying and parsing logic is abstracted behind two factories that handle the differences between Cassandra versions: SchemaQueriesFactory
and SchemaParserFactory
. These are pluggable by extending the context and overriding the corresponding buildXxx
methods.