blob: 46b8e8e91e82f4274c0e500f1667858ea46482a2 [file] [log] [blame]
import time
import pytest
import logging
from cassandra.query import dict_factory
from ccmlib.node import NodeError
from dtest import Tester
from cassandra.protocol import SyntaxException, ConfigurationException
since = pytest.mark.since
logger = logging.getLogger(__name__)
VERSION_311 = 'github:apache/cassandra-3.11'
VERSION_TRUNK = 'github:apache/trunk'
@pytest.mark.upgrade_test
@since('4.0')
class TestUpgradeSuperColumnsThrough(Tester):
def upgrade_to_version(self, tag, start_rpc=True, wait=True, nodes=None):
logger.debug('Upgrading to ' + tag)
if nodes is None:
nodes = self.cluster.nodelist()
for node in nodes:
logger.debug('Shutting down node: ' + node.name)
node.drain()
node.watch_log_for("DRAINED")
node.stop(wait_other_notice=False)
# Update Cassandra Directory
for node in nodes:
node.set_install_dir(version=tag)
logger.debug("Set new cassandra dir for %s: %s" % (node.name, node.get_install_dir()))
self.cluster.set_install_dir(version=tag)
self.fixture_dtest_setup.reinitialize_cluster_for_different_version()
# Restart nodes on new version
for node in nodes:
logger.debug('Starting %s on new version (%s)' % (node.name, tag))
node.start(wait_other_notice=wait, wait_for_binary_proto=wait)
def prepare(self, num_nodes=1, cassandra_version="github:apache/cassandra-2.2"):
cluster = self.cluster
# Forcing cluster version on purpose
cluster.set_install_dir(version=cassandra_version)
self.fixture_dtest_setup.reinitialize_cluster_for_different_version()
cluster.populate(num_nodes)
cluster.start()
return cluster
def test_mixed_cluster(self):
cluster = self.prepare(num_nodes=2, cassandra_version=VERSION_311)
node1, node2 = self.cluster.nodelist()
node1.drain()
node1.watch_log_for("DRAINED")
node1.stop(wait_other_notice=False)
node1.set_install_dir(version=VERSION_TRUNK)
node1.start(wait_for_binary_proto=True)
session = self.patient_cql_connection(node2, row_factory=dict_factory)
# Schema propagation will time out
session.execute("CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy','replication_factor': '2' };")
thrown = False
try:
session.execute("CREATE TABLE ks.compact_table (pk int PRIMARY KEY, col1 int, col2 int) WITH COMPACT STORAGE")
except (ConfigurationException, SyntaxException):
thrown = True
assert thrown
def test_upgrade_with_dropped_compact_storage(self):
cluster = self.prepare(cassandra_version=VERSION_311)
node = self.cluster.nodelist()[0]
session = self.patient_cql_connection(node, row_factory=dict_factory)
session.execute("CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy','replication_factor': '1' };")
session.execute("CREATE TABLE ks.compact_table (pk int PRIMARY KEY, col1 int, col2 int) WITH COMPACT STORAGE")
for i in range(1, 5):
session.execute("INSERT INTO ks.compact_table (pk, col1, col2) VALUES ({i}, {i}, {i})".format(i=i))
session.execute("ALTER TABLE ks.compact_table DROP COMPACT STORAGE")
self.upgrade_to_version(VERSION_TRUNK, wait=True)
session = self.patient_cql_connection(node, row_factory=dict_factory)
assert (list(session.execute("SELECT * FROM ks.compact_table WHERE pk = 1")) ==
[{'col2': 1, 'pk': 1, 'column1': None, 'value': None, 'col1': 1}])
def test_upgrade_with_dropped_compact_storage_index(self):
cluster = self.prepare(cassandra_version=VERSION_311)
node = self.cluster.nodelist()[0]
session = self.patient_cql_connection(node, row_factory=dict_factory)
session.execute("CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy','replication_factor': '1' };")
session.execute("CREATE TABLE ks.compact_table (pk ascii PRIMARY KEY, col1 ascii) WITH COMPACT STORAGE")
session.execute("CREATE INDEX ON ks.compact_table(col1)")
for i in range(1, 10):
session.execute("INSERT INTO ks.compact_table (pk, col1) VALUES ('{pk}', '{col1}')".format(pk=i, col1=i * 10))
assert (list(session.execute("SELECT * FROM ks.compact_table WHERE col1 = '50'")) ==
[{'pk': '5', 'col1': '50'}])
assert (list(session.execute("SELECT * FROM ks.compact_table WHERE pk = '5'")) ==
[{'pk': '5', 'col1': '50'}])
session.execute("ALTER TABLE ks.compact_table DROP COMPACT STORAGE")
assert (list(session.execute("SELECT * FROM ks.compact_table WHERE col1 = '50'")) ==
[{'col1': '50', 'column1': None, 'pk': '5', 'value': None}])
assert (list(session.execute("SELECT * FROM ks.compact_table WHERE pk = '5'")) ==
[{'col1': '50', 'column1': None, 'pk': '5', 'value': None}])
self.upgrade_to_version(VERSION_TRUNK, wait=True)
session = self.patient_cql_connection(node, row_factory=dict_factory)
assert (list(session.execute("SELECT * FROM ks.compact_table WHERE col1 = '50'")) ==
[{'col1': '50', 'column1': None, 'pk': '5', 'value': None}])
assert (list(session.execute("SELECT * FROM ks.compact_table WHERE pk = '5'")) ==
[{'col1': '50', 'column1': None, 'pk': '5', 'value': None}])