blob: 174f0dfec0625c4b4d65bbf9ad2379d18590f3fa [file] [log] [blame]
import os
import time
import pytest
import logging
from cassandra import ConsistencyLevel
from dtest import Tester, create_ks
from tools.data import create_c1c2_table, insert_c1c2, query_c1c2
from tools.assertions import assert_stderr_clean
from tools.jmxutils import (JolokiaAgent, make_mbean)
since = pytest.mark.since
ported_to_in_jvm = pytest.mark.ported_to_in_jvm
logger = logging.getLogger(__name__)
@since('3.0')
class TestHintedHandoffConfig(Tester):
"""
Tests the hinted handoff configuration options introduced in
CASSANDRA-9035.
@jira_ticket CASSANDRA-9035
"""
def _start_two_node_cluster(self, config_options=None):
"""
Start a cluster with two nodes and return them
"""
cluster = self.cluster
if config_options:
cluster.set_configuration_options(values=config_options)
cluster.populate([2]).start()
return cluster.nodelist()
def _launch_nodetool_cmd(self, node, cmd):
"""
Launch a nodetool command and check there is no error, return the result
"""
out, err, _ = node.nodetool(cmd)
assert_stderr_clean(err)
return out
def _do_hinted_handoff(self, node1, node2, enabled, keyspace='ks'):
"""
Test that if we stop one node the other one
will store hints only when hinted handoff is enabled
"""
session = self.patient_exclusive_cql_connection(node1)
create_ks(session, keyspace, 2)
create_c1c2_table(self, session)
node2.stop(wait_other_notice=True)
insert_c1c2(session, n=100, consistency=ConsistencyLevel.ONE)
log_mark = node1.mark_log()
node2.start()
if enabled:
node1.watch_log_for(["Finished hinted"], from_mark=log_mark, timeout=120)
node1.stop(wait_other_notice=True)
# Check node2 for all the keys that should have been delivered via HH if enabled or not if not enabled
session = self.patient_exclusive_cql_connection(node2, keyspace=keyspace)
for n in range(0, 100):
if enabled:
query_c1c2(session, n, ConsistencyLevel.ONE)
else:
query_c1c2(session, n, ConsistencyLevel.ONE, tolerate_missing=True, must_be_missing=True)
@ported_to_in_jvm('4.0')
def test_nodetool(self):
"""
Test various nodetool commands
"""
node1, node2 = self._start_two_node_cluster({'hinted_handoff_enabled': True})
for node in node1, node2:
res = self._launch_nodetool_cmd(node, 'statushandoff')
assert 'Hinted handoff is running' == res.rstrip()
self._launch_nodetool_cmd(node, 'disablehandoff')
res = self._launch_nodetool_cmd(node, 'statushandoff')
assert 'Hinted handoff is not running' == res.rstrip()
self._launch_nodetool_cmd(node, 'enablehandoff')
res = self._launch_nodetool_cmd(node, 'statushandoff')
assert 'Hinted handoff is running' == res.rstrip()
self._launch_nodetool_cmd(node, 'disablehintsfordc dc1')
res = self._launch_nodetool_cmd(node, 'statushandoff')
assert 'Hinted handoff is running{}Data center dc1 is disabled'.format(os.linesep) == res.rstrip()
self._launch_nodetool_cmd(node, 'enablehintsfordc dc1')
res = self._launch_nodetool_cmd(node, 'statushandoff')
assert 'Hinted handoff is running' == res.rstrip()
def test_hintedhandoff_disabled(self):
"""
Test gloabl hinted handoff disabled
"""
node1, node2 = self._start_two_node_cluster({'hinted_handoff_enabled': False})
for node in node1, node2:
res = self._launch_nodetool_cmd(node, 'statushandoff')
assert 'Hinted handoff is not running' == res.rstrip()
self._do_hinted_handoff(node1, node2, False)
def test_hintedhandoff_enabled(self):
"""
Test global hinted handoff enabled
"""
node1, node2 = self._start_two_node_cluster({'hinted_handoff_enabled': True})
for node in node1, node2:
res = self._launch_nodetool_cmd(node, 'statushandoff')
assert 'Hinted handoff is running' == res.rstrip()
self._do_hinted_handoff(node1, node2, True)
@since('4.0')
def test_hintedhandoff_setmaxwindow(self):
"""
Test global hinted handoff against max_hint_window_in_ms update via nodetool
"""
node1, node2 = self._start_two_node_cluster({'hinted_handoff_enabled': True, "max_hint_window_in_ms": 300000})
for node in node1, node2:
res = self._launch_nodetool_cmd(node, 'statushandoff')
assert 'Hinted handoff is running' == res.rstrip()
res = self._launch_nodetool_cmd(node, 'getmaxhintwindow')
assert 'Current max hint window: 300000 ms' == res.rstrip()
self._do_hinted_handoff(node1, node2, True)
node1.start()
for node in node1, node2:
# Make sure HH is effective on both nodes despite node startup races CASSANDRA-15865
self._launch_nodetool_cmd(node, 'setmaxhintwindow 1')
res = self._launch_nodetool_cmd(node, 'getmaxhintwindow')
assert 'Current max hint window: 1 ms' == res.rstrip()
self._do_hinted_handoff(node1, node2, False, keyspace='ks2')
def test_hintedhandoff_dc_disabled(self):
"""
Test global hinted handoff enabled with the dc disabled
"""
node1, node2 = self._start_two_node_cluster({'hinted_handoff_enabled': True,
'hinted_handoff_disabled_datacenters': ['dc1']})
for node in node1, node2:
res = self._launch_nodetool_cmd(node, 'statushandoff')
assert 'Hinted handoff is running{}Data center dc1 is disabled'.format(os.linesep) == res.rstrip()
self._do_hinted_handoff(node1, node2, False)
def test_hintedhandoff_dc_reenabled(self):
"""
Test global hinted handoff enabled with the dc disabled first and then re-enabled
"""
node1, node2 = self._start_two_node_cluster({'hinted_handoff_enabled': True,
'hinted_handoff_disabled_datacenters': ['dc1']})
for node in node1, node2:
res = self._launch_nodetool_cmd(node, 'statushandoff')
assert 'Hinted handoff is running{}Data center dc1 is disabled'.format(os.linesep) == res.rstrip()
for node in node1, node2:
self._launch_nodetool_cmd(node, 'enablehintsfordc dc1')
res = self._launch_nodetool_cmd(node, 'statushandoff')
assert 'Hinted handoff is running' == res.rstrip()
self._do_hinted_handoff(node1, node2, True)
class TestHintedHandoff(Tester):
@ported_to_in_jvm('4.0')
@pytest.mark.no_vnodes
def test_hintedhandoff_decom(self):
self.fixture_dtest_setup.ignore_log_patterns = [
'Could not update repaired ranges.*Giving up'
]
self.cluster.populate(4).start()
[node1, node2, node3, node4] = self.cluster.nodelist()
session = self.patient_cql_connection(node1)
create_ks(session, 'ks', 2)
create_c1c2_table(self, session)
node4.stop(wait_other_notice=True)
insert_c1c2(session, n=100, consistency=ConsistencyLevel.ONE)
node1.decommission()
node4.start(wait_for_binary_proto=True)
force = True if self.cluster.version() >= '3.12' else False
node2.decommission(force=force)
node3.decommission(force=force)
time.sleep(5)
for x in range(0, 100):
query_c1c2(session, x, ConsistencyLevel.ONE)
@since('4.1')
def test_hintedhandoff_window(self):
"""
Test that we only store at a maximum the hint window worth of hints.
Prior to CASSANDRA-14309 we would store another window worth of hints
if the down node was brought up and then taken back down immediately.
We would also store another window of hints on a live node if the live
node was restarted.
@jira_ticket CASSANDRA-14309
"""
# hint_window_persistent_enabled is set to true by default
self.cluster.set_configuration_options({'max_hint_window_in_ms': 10000,
'hinted_handoff_enabled': True,
'max_hints_delivery_threads': 1,
'hints_flush_period_in_ms': 100, })
self.cluster.populate(2).start()
node1, node2 = self.cluster.nodelist()
session = self.patient_cql_connection(node1)
create_ks(session, 'ks', 2)
create_c1c2_table(self, session)
# Stop handoff until very end and take node2 down for first round of hints
node1.nodetool('pausehandoff')
node2.nodetool('disablebinary')
node2.watch_log_for(["Stop listening for CQL clients"], timeout=120)
node2.nodetool('disablegossip')
node2.watch_log_for(["Announcing shutdown", "state jump to shutdown"], timeout=120)
node1.watch_log_for(["state jump to shutdown"], timeout=120)
log_mark_node_1 = node1.mark_log()
log_mark_node_2 = node2.mark_log()
# First round of hints. We expect these to be replayed and the only
# hints within the window
insert_c1c2(session, n=(0, 100), consistency=ConsistencyLevel.ONE)
# Let hint window pass
time.sleep(15)
# Re-enable and disable the node. Prior to CASSANDRA-14215 this should make the hint window on node1 reset.
node2.nodetool('enablegossip')
node2.watch_log_for(["state jump to NORMAL"], timeout=120, from_mark=log_mark_node_2)
node1.watch_log_for(["state jump to NORMAL"], timeout=120, from_mark=log_mark_node_1)
log_mark_node_1 = node1.mark_log()
log_mark_node_2 = node2.mark_log()
node2.nodetool('disablegossip')
node2.watch_log_for(["Announcing shutdown", "state jump to shutdown"], timeout=120, from_mark=log_mark_node_2)
node1.watch_log_for(["state jump to shutdown"], timeout=120, from_mark=log_mark_node_1)
log_mark_node_1 = node1.mark_log()
log_mark_node_2 = node2.mark_log()
def endpoint_downtime(node_to_query, node):
mbean = make_mbean('net', type='Gossiper')
with JolokiaAgent(node_to_query) as jmx:
return jmx.execute_method(mbean, 'getEndpointDowntime(java.lang.String)', [node])
while endpoint_downtime(node1, "127.0.0.2") <= 5000:
time.sleep(1)
# Second round of inserts. We do not expect hints to be stored.
insert_c1c2(session, n=(100, 200), consistency=ConsistencyLevel.ONE)
# Restart node1. Prior to CASSANDRA-14215 this would reset node1's hint window.
node1.stop()
node1.start(wait_for_binary_proto=True, wait_other_notice=False)
session = self.patient_exclusive_cql_connection(node1)
session.execute('USE ks')
# Third round of inserts. We do not expect hints to be stored.
insert_c1c2(session, n=(200, 300), consistency=ConsistencyLevel.ONE)
# Enable node2 and wait for hints to be replayed
node2.nodetool('enablegossip')
node2.watch_log_for(["state jump to NORMAL"], timeout=120, from_mark=log_mark_node_2)
node2.nodetool('enablebinary')
node2.watch_log_for(["Starting listening for CQL clients"], timeout=120, from_mark=log_mark_node_2)
node1.nodetool('resumehandoff')
node1.watch_log_for('Finished hinted handoff')
# Stop node1 so that we only query node2
node1.stop()
session = self.patient_exclusive_cql_connection(node2)
session.execute('USE ks')
# Ensure first dataset is present (through replayed hints)
for x in range(0, 100):
query_c1c2(session, x, ConsistencyLevel.ONE)
# Ensure second and third datasets are not present
for x in range(100, 300):
query_c1c2(session, x, ConsistencyLevel.ONE, tolerate_missing=True, must_be_missing=True)