blob: 5b96e643c62baae967c55a4a6f63aca4f36d9ed5 [file] [log] [blame]
import os
from collections import OrderedDict
from dtest import CASSANDRA_VERSION_FROM_BUILD, Tester, debug
from pycassa.pool import ConnectionPool
from pycassa.columnfamily import ColumnFamily
from tools.assertions import assert_all
# Use static supercolumn data to reduce total test time and avoid driver issues connecting to C* 1.2.
# The data contained in the SSTables is (name, {'attr': {'name': name}}) for the name in NAMES.
SCHEMA_PATH = os.path.join("./", "upgrade_tests", "supercolumn-data", "cassandra-2.0", "schema-2.0.cql")
TABLES_PATH = os.path.join("./", "upgrade_tests", "supercolumn-data", "cassandra-2.0", "supcols", "cols")
NAMES = ["Alice", "Bob", "Claire", "Dave", "Ed", "Frank", "Grace"]
class TestSCUpgrade(Tester):
"""
Tests upgrade between a 2.0 cluster with predefined super columns and all other versions. Verifies data with both
CQL and Thrift.
"""
def __init__(self, *args, **kwargs):
self.ignore_log_patterns = [
# This one occurs if we do a non-rolling upgrade, the node
# it's trying to send the migration to hasn't started yet,
# and when it does, it gets replayed and everything is fine.
r'Can\'t send migration request: node.*is down',
]
if CASSANDRA_VERSION_FROM_BUILD < '2.2':
_known_teardown_race_error = (
'ScheduledThreadPoolExecutor$ScheduledFutureTask@[0-9a-f]+ '
'rejected from org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor'
)
# don't alter ignore_log_patterns on the class, just the obj for this test
self.ignore_log_patterns += [_known_teardown_race_error]
Tester.__init__(self, *args, **kwargs)
def prepare(self, num_nodes=1, cassandra_version="git:cassandra-2.1"):
cluster = self.cluster
# Forcing cluster version on purpose
cluster.set_install_dir(version=cassandra_version)
if "memtable_allocation_type" in cluster._config_options:
del cluster._config_options['memtable_allocation_type']
cluster.populate(num_nodes).start()
return cluster
def verify_with_thrift(self):
# No more thrift in 4.0
if self.cluster.version() >= '4':
return
pool = ConnectionPool("supcols", pool_size=1)
super_col_fam = ColumnFamily(pool, "cols")
for name in NAMES:
super_col_value = super_col_fam.get(name)
self.assertEqual(OrderedDict([(('attr', u'name'), name)]), super_col_value)
def verify_with_cql(self, session):
session.execute("USE supcols")
expected = [[name, 'attr', u'name', name] for name in ['Grace', 'Claire', 'Dave', 'Frank', 'Ed', 'Bob', 'Alice']]
assert_all(session, "SELECT * FROM cols", expected)
def _upgrade_super_columns_through_versions_test(self, upgrade_path):
cluster = self.prepare()
node1 = cluster.nodelist()[0]
node1.run_cqlsh(cmds="""CREATE KEYSPACE supcols WITH replication = {
'class': 'SimpleStrategy',
'replication_factor': '1'
};
USE supcols;
CREATE TABLE cols (
key blob,
column1 blob,
column2 text,
value blob,
PRIMARY KEY ((key), column1, column2)
) WITH COMPACT STORAGE AND
bloom_filter_fp_chance=0.010000 AND
caching='KEYS_ONLY' AND
comment='' AND
dclocal_read_repair_chance=0.000000 AND
gc_grace_seconds=864000 AND
index_interval=128 AND
read_repair_chance=0.100000 AND
replicate_on_write='true' AND
populate_io_cache_on_flush='false' AND
default_time_to_live=0 AND
speculative_retry='99.0PERCENTILE' AND
memtable_flush_period_in_ms=0 AND
compaction={'class': 'SizeTieredCompactionStrategy'} AND
compression={'sstable_compression': 'SnappyCompressor'};
""")
node1.bulkload(options=[TABLES_PATH])
node1.nodetool("upgradesstables -a")
session = self.patient_exclusive_cql_connection(node1)
self.verify_with_cql(session)
self.verify_with_thrift()
for version in upgrade_path:
self.upgrade_to_version(version)
if self.cluster.version() < '4':
node1.nodetool("enablethrift")
session = self.patient_exclusive_cql_connection(node1)
self.verify_with_cql(session)
self.verify_with_thrift()
cluster.remove(node=node1)
def upgrade_super_columns_through_all_versions_test(self):
self._upgrade_super_columns_through_versions_test(upgrade_path=['git:cassandra-2.2', 'git:cassandra-3.0',
'git:cassandra-3.9', 'git:trunk'])
def upgrade_super_columns_through_limited_versions_test(self):
self._upgrade_super_columns_through_versions_test(upgrade_path=['git:cassandra-3.0', 'git:trunk'])
def upgrade_to_version(self, tag, nodes=None):
debug('Upgrading to ' + tag)
if nodes is None:
nodes = self.cluster.nodelist()
for node in nodes:
debug('Shutting down node: ' + node.name)
node.drain()
node.watch_log_for("DRAINED")
node.stop(wait_other_notice=False)
# Update Cassandra Directory
for node in nodes:
node.set_install_dir(version=tag)
if tag < "2.1":
if "memtable_allocation_type" in node.config_options:
node.config_options.__delitem__("memtable_allocation_type")
debug("Set new cassandra dir for %s: %s" % (node.name, node.get_install_dir()))
self.cluster.set_install_dir(version=tag)
# Restart nodes on new version
for node in nodes:
debug('Starting %s on new version (%s)' % (node.name, tag))
# Setup log4j / logback again (necessary moving from 2.0 -> 2.1):
node.set_log_level("INFO")
node.start(wait_other_notice=True, wait_for_binary_proto=True)
node.nodetool('upgradesstables -a')