blob: 6732285a0d94f73effd84bd81477105b85481df7 [file] [log] [blame]
import time
import pytest
import logging
from cassandra.query import dict_factory
from ccmlib.node import NodeError
from dtest import Tester
from cassandra.protocol import SyntaxException, ConfigurationException
since = pytest.mark.since
logger = logging.getLogger(__name__)
VERSION_311 = 'github:apache/cassandra-3.11'
VERSION_TRUNK = 'github:apache/trunk'
@pytest.mark.upgrade_test
@since('4.0')
class TestUpgradeSuperColumnsThrough(Tester):
def upgrade_to_version(self, tag, start_rpc=True, wait=True, nodes=None):
logger.debug('Upgrading to ' + tag)
if nodes is None:
nodes = self.cluster.nodelist()
for node in nodes:
logger.debug('Shutting down node: ' + node.name)
node.drain()
node.watch_log_for("DRAINED")
node.stop(wait_other_notice=False)
# Update Cassandra Directory
for node in nodes:
node.set_install_dir(version=tag)
logger.debug("Set new cassandra dir for %s: %s" % (node.name, node.get_install_dir()))
self.cluster.set_install_dir(version=tag)
self.fixture_dtest_setup.reinitialize_cluster_for_different_version()
# Restart nodes on new version
for node in nodes:
logger.debug('Starting %s on new version (%s)' % (node.name, tag))
node.start(wait_other_notice=wait, wait_for_binary_proto=wait)
def prepare(self, num_nodes=1, cassandra_version="github:apache/cassandra-2.2"):
cluster = self.cluster
# Forcing cluster version on purpose
cluster.set_install_dir(version=cassandra_version)
self.fixture_dtest_setup.reinitialize_cluster_for_different_version()
cluster.populate(num_nodes)
cluster.start()
return cluster
def test_upgrade_compact_storage(self):
cluster = self.prepare(cassandra_version='github:apache/cassandra-3.0')
node = self.cluster.nodelist()[0]
session = self.patient_cql_connection(node, row_factory=dict_factory)
session.execute("CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy','replication_factor': '1' };")
session.execute("CREATE TABLE ks.compact_table (pk int PRIMARY KEY, col1 int, col2 int) WITH COMPACT STORAGE")
for i in range(1, 5):
session.execute("INSERT INTO ks.compact_table (pk, col1, col2) VALUES ({i}, {i}, {i})".format(i=i))
self.upgrade_to_version(VERSION_TRUNK, wait=False)
self.fixture_dtest_setup.allow_log_errors = True
time.sleep(5)
# After restart, it won't start
errors = len(node.grep_log("Compact Tables are not allowed in Cassandra starting with 4.0 version"))
assert errors > 0
def test_mixed_cluster(self):
cluster = self.prepare(num_nodes=2, cassandra_version=VERSION_311)
node1, node2 = self.cluster.nodelist()
node1.drain()
node1.watch_log_for("DRAINED")
node1.stop(wait_other_notice=False)
node1.set_install_dir(version=VERSION_TRUNK)
node1.start(wait_for_binary_proto=True)
session = self.patient_cql_connection(node2, row_factory=dict_factory)
# Schema propagation will time out
session.execute("CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy','replication_factor': '2' };")
thrown = False
try:
session.execute("CREATE TABLE ks.compact_table (pk int PRIMARY KEY, col1 int, col2 int) WITH COMPACT STORAGE")
except (ConfigurationException, SyntaxException):
thrown = True
assert thrown
def test_upgrade_with_dropped_compact_storage(self):
cluster = self.prepare(cassandra_version=VERSION_311)
node = self.cluster.nodelist()[0]
session = self.patient_cql_connection(node, row_factory=dict_factory)
session.execute("CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy','replication_factor': '1' };")
session.execute("CREATE TABLE ks.compact_table (pk int PRIMARY KEY, col1 int, col2 int) WITH COMPACT STORAGE")
for i in range(1, 5):
session.execute("INSERT INTO ks.compact_table (pk, col1, col2) VALUES ({i}, {i}, {i})".format(i=i))
session.execute("ALTER TABLE ks.compact_table DROP COMPACT STORAGE")
self.upgrade_to_version(VERSION_TRUNK, wait=True)
session = self.patient_cql_connection(node, row_factory=dict_factory)
assert (list(session.execute("SELECT * FROM ks.compact_table WHERE pk = 1")) ==
[{'col2': 1, 'pk': 1, 'column1': None, 'value': None, 'col1': 1}])
def test_force_readd_compact_storage(self):
cluster = self.prepare(cassandra_version=VERSION_311)
node = self.cluster.nodelist()[0]
session = self.patient_cql_connection(node, row_factory=dict_factory)
session.execute("CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy','replication_factor': '1' };")
session.execute("CREATE TABLE ks.compact_table (pk int PRIMARY KEY, col1 int, col2 int) WITH COMPACT STORAGE")
for i in range(1, 5):
session.execute("INSERT INTO ks.compact_table (pk, col1, col2) VALUES ({i}, {i}, {i})".format(i=i))
session.execute("ALTER TABLE ks.compact_table DROP COMPACT STORAGE")
self.upgrade_to_version(VERSION_TRUNK, wait=True)
session = self.patient_cql_connection(node, row_factory=dict_factory)
session.execute("update system_schema.tables set flags={} where keyspace_name='ks' and table_name='compact_table';")
assert (list(session.execute("SELECT * FROM ks.compact_table WHERE pk = 1")) ==
[{'col2': 1, 'pk': 1, 'column1': None, 'value': None, 'col1': 1}])
self.fixture_dtest_setup.allow_log_errors = True
node.stop(wait_other_notice=False)
node.set_install_dir(version=VERSION_TRUNK)
try:
node.start(wait_other_notice=False, wait_for_binary_proto=False, verbose=False)
except NodeError:
print("error") # ignore
time.sleep(5)
# After restart, it won't start
errors = len(node.grep_log("Compact Tables are not allowed in Cassandra starting with 4.0 version"))
assert errors > 0
def test_upgrade_with_dropped_compact_storage_index(self):
cluster = self.prepare(cassandra_version=VERSION_311)
node = self.cluster.nodelist()[0]
session = self.patient_cql_connection(node, row_factory=dict_factory)
session.execute("CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy','replication_factor': '1' };")
session.execute("CREATE TABLE ks.compact_table (pk ascii PRIMARY KEY, col1 ascii) WITH COMPACT STORAGE")
session.execute("CREATE INDEX ON ks.compact_table(col1)")
for i in range(1, 10):
session.execute("INSERT INTO ks.compact_table (pk, col1) VALUES ('{pk}', '{col1}')".format(pk=i, col1=i * 10))
assert (list(session.execute("SELECT * FROM ks.compact_table WHERE col1 = '50'")) ==
[{'pk': '5', 'col1': '50'}])
assert (list(session.execute("SELECT * FROM ks.compact_table WHERE pk = '5'")) ==
[{'pk': '5', 'col1': '50'}])
session.execute("ALTER TABLE ks.compact_table DROP COMPACT STORAGE")
assert (list(session.execute("SELECT * FROM ks.compact_table WHERE col1 = '50'")) ==
[{'col1': '50', 'column1': None, 'pk': '5', 'value': None}])
assert (list(session.execute("SELECT * FROM ks.compact_table WHERE pk = '5'")) ==
[{'col1': '50', 'column1': None, 'pk': '5', 'value': None}])
self.upgrade_to_version(VERSION_TRUNK, wait=True)
session = self.patient_cql_connection(node, row_factory=dict_factory)
assert (list(session.execute("SELECT * FROM ks.compact_table WHERE col1 = '50'")) ==
[{'col1': '50', 'column1': None, 'pk': '5', 'value': None}])
assert (list(session.execute("SELECT * FROM ks.compact_table WHERE pk = '5'")) ==
[{'col1': '50', 'column1': None, 'pk': '5', 'value': None}])
def test_downgrade_after_failed_upgrade(self):
"""
The purpose of this test is to show that after verifying early in the startup process in Cassandra 4.0 that
COMPACT STORAGE was not removed prior start of an upgrade, users can still successfully downgrade, remove the
COMPACT STORAGE, and restart the upgrade process.
@jira_ticket CASSANDRA-16063
"""
self.prepare(cassandra_version=VERSION_311)
node = self.cluster.nodelist()[0]
session = self.patient_cql_connection(node, row_factory=dict_factory)
session.execute("CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy','replication_factor': '1' };")
session.execute("CREATE TABLE ks.compact_table (pk ascii PRIMARY KEY, col1 ascii) WITH COMPACT STORAGE")
session.execute("CREATE INDEX ON ks.compact_table(col1)")
for i in range(1, 10):
session.execute("INSERT INTO ks.compact_table (pk, col1) VALUES ('{pk}', '{col1}')".format(pk=i, col1=i * 10))
assert (list(session.execute("SELECT * FROM ks.compact_table WHERE col1 = '50'")) ==
[{'pk': '5', 'col1': '50'}])
assert (list(session.execute("SELECT * FROM ks.compact_table WHERE pk = '5'")) ==
[{'pk': '5', 'col1': '50'}])
logging.debug("Upgrading to current version")
self.fixture_dtest_setup.allow_log_errors = True
node.stop(wait_other_notice=False)
node.set_install_dir(version=VERSION_TRUNK)
try:
node.start(wait_other_notice=False, wait_for_binary_proto=False, verbose=False)
except NodeError:
print("error") # ignore
time.sleep(5)
# After restart, it won't start
errors = len(node.grep_log("Compact Tables are not allowed in Cassandra starting with 4.0 version"))
assert errors > 0
logging.debug("Downgrading to 3.11")
node.set_install_dir(version=VERSION_311)
logger.debug("Set new cassandra dir for %s: %s" % (node.name, node.get_install_dir()))
self.cluster.set_install_dir(version=VERSION_311)
self.fixture_dtest_setup.reinitialize_cluster_for_different_version()
node.start(wait_other_notice=False, wait_for_binary_proto=False)
logger.debug('Starting %s on new version (%s)' % (node.name, VERSION_311))
session = self.patient_cql_connection(node, row_factory=dict_factory)
assert (list(session.execute("SELECT * FROM ks.compact_table WHERE col1 = '50'")) ==
[{'pk': '5', 'col1': '50'}])
assert (list(session.execute("SELECT * FROM ks.compact_table WHERE pk = '5'")) ==
[{'pk': '5', 'col1': '50'}])
session.execute("ALTER TABLE ks.compact_table DROP COMPACT STORAGE")
self.upgrade_to_version(VERSION_TRUNK, wait=True)
session = self.patient_cql_connection(node, row_factory=dict_factory)
assert (list(session.execute("SELECT * FROM ks.compact_table WHERE col1 = '50'")) ==
[{'col1': '50', 'column1': None, 'pk': '5', 'value': None}])
assert (list(session.execute("SELECT * FROM ks.compact_table WHERE pk = '5'")) ==
[{'col1': '50', 'column1': None, 'pk': '5', 'value': None}])