blob: f620e70e78e148345d9018ed5e7c503dc0d506fe [file] [log] [blame]
from distutils.version import LooseVersion
import os
import pytest
import logging
from dtest import Tester
from thrift_test import get_thrift_client
from tools.assertions import assert_all
from thrift_bindings.thrift010.Cassandra import (CfDef, Column, ColumnDef,
ColumnOrSuperColumn, ColumnParent,
ColumnPath, ColumnSlice,
ConsistencyLevel, CounterColumn,
Deletion, IndexExpression,
IndexOperator, IndexType,
InvalidRequestException, KeyRange,
KeySlice, KsDef, MultiSliceRequest,
Mutation, NotFoundException,
SlicePredicate, SliceRange,
SuperColumn)
from upgrade_tests.upgrade_manifest import indev_2_1_x, indev_2_2_x, indev_3_0_x, indev_3_11_x, indev_trunk, \
CASSANDRA_4_0
logger = logging.getLogger(__name__)
# Use static supercolumn data to reduce total test time and avoid driver issues connecting to C* 1.2.
# The data contained in the SSTables is (name, {'attr': {'name': name}}) for the name in NAMES.
SCHEMA_PATH = os.path.join("./", "upgrade_tests", "supercolumn-data", "cassandra-2.0", "schema-2.0.cql")
TABLES_PATH = os.path.join("./", "upgrade_tests", "supercolumn-data", "cassandra-2.0", "supcols", "cols")
NAMES = [name.encode() for name in ["Alice", "Bob", "Claire", "Dave", "Ed", "Frank", "Grace"]]
@pytest.mark.upgrade_test
class TestSCUpgrade(Tester):
"""
Tests upgrade between a 2.0 cluster with predefined super columns and all other versions. Verifies data with both
CQL and Thrift.
"""
@pytest.fixture(autouse=True)
def fixture_add_additional_log_patterns(self, fixture_dtest_setup):
fixture_dtest_setup.allow_log_errors = True
fixture_dtest_setup.ignore_log_patterns = (
# This one occurs if we do a non-rolling upgrade, the node
# it's trying to send the migration to hasn't started yet,
# and when it does, it gets replayed and everything is fine.
r'Can\'t send migration request: node.*is down',
)
if fixture_dtest_setup.dtest_config.cassandra_version_from_build < '2.2':
_known_teardown_race_error = (
'ScheduledThreadPoolExecutor$ScheduledFutureTask@[0-9a-f]+ '
'rejected from org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor'
)
# don't alter ignore_log_patterns on the class, just the obj for this test
fixture_dtest_setup.ignore_log_patterns += [_known_teardown_race_error]
def prepare(self, num_nodes=1, cassandra_version=indev_2_1_x.version):
cluster = self.cluster
# Forcing cluster version on purpose
cluster.set_install_dir(version=cassandra_version)
self.fixture_dtest_setup.reinitialize_cluster_for_different_version()
if "memtable_allocation_type" in cluster._config_options:
del cluster._config_options['memtable_allocation_type']
cluster.populate(num_nodes).start()
return cluster
def verify_with_thrift(self):
# No more thrift in 4.0
if self.cluster.version() >= '4':
return
node = self.cluster.nodelist()[0]
host, port = node.network_interfaces['thrift']
client = get_thrift_client(host, port)
client.transport.open()
client.set_keyspace('supcols')
p = SlicePredicate(slice_range=SliceRange(''.encode(), ''.encode(), False, 1000))
for name in NAMES:
super_col_value = client.get_slice(name, ColumnParent("cols"), p, ConsistencyLevel.ONE)
logger.debug("get_slice(%s) returned %s" % (name, super_col_value))
assert name == super_col_value[0].column.value
def verify_with_cql(self, session):
session.execute("USE supcols")
expected = [[name.encode(), 'attr'.encode(), 'name', name.encode()] for name in ['Grace', 'Claire', 'Dave', 'Frank', 'Ed', 'Bob', 'Alice']]
assert_all(session, "SELECT * FROM cols", expected)
def _upgrade_super_columns_through_versions_test(self, upgrade_path):
cluster = self.prepare()
node1 = cluster.nodelist()[0]
node1.run_cqlsh(cmds="""CREATE KEYSPACE supcols WITH replication = {
'class': 'SimpleStrategy',
'replication_factor': '1'
};
USE supcols;
CREATE TABLE cols (
key blob,
column1 blob,
column2 text,
value blob,
PRIMARY KEY ((key), column1, column2)
) WITH COMPACT STORAGE AND
bloom_filter_fp_chance=0.010000 AND
caching='KEYS_ONLY' AND
comment='' AND
dclocal_read_repair_chance=0.000000 AND
gc_grace_seconds=864000 AND
index_interval=128 AND
read_repair_chance=0.100000 AND
replicate_on_write='true' AND
populate_io_cache_on_flush='false' AND
default_time_to_live=0 AND
speculative_retry='99.0PERCENTILE' AND
memtable_flush_period_in_ms=0 AND
compaction={'class': 'SizeTieredCompactionStrategy'} AND
compression={'sstable_compression': 'SnappyCompressor'};
""")
node1.bulkload(options=[TABLES_PATH])
node1.nodetool("upgradesstables -a")
session = self.patient_exclusive_cql_connection(node1)
self.verify_with_cql(session)
self.verify_with_thrift()
for version in upgrade_path:
if LooseVersion(version.family) >= CASSANDRA_4_0:
session.execute("ALTER TABLE supcols.cols DROP COMPACT STORAGE")
self.upgrade_to_version(version.version)
session = self.patient_exclusive_cql_connection(node1)
self.verify_with_cql(session)
if self.cluster.version() < CASSANDRA_4_0:
node1.nodetool("enablethrift")
self.verify_with_thrift()
cluster.remove(node=node1)
def test_upgrade_super_columns_through_all_versions(self):
self._upgrade_super_columns_through_versions_test(upgrade_path=[indev_2_2_x, indev_3_0_x,
indev_3_11_x, indev_trunk])
def test_upgrade_super_columns_through_limited_versions(self):
self._upgrade_super_columns_through_versions_test(upgrade_path=[indev_3_0_x, indev_trunk])
def upgrade_to_version(self, tag, nodes=None):
logger.debug('Upgrading to ' + tag)
if nodes is None:
nodes = self.cluster.nodelist()
for node in nodes:
logger.debug('Shutting down node: ' + node.name)
node.drain()
node.watch_log_for("DRAINED")
node.stop(wait_other_notice=False)
# Update Cassandra Directory
for node in nodes:
node.set_install_dir(version=tag)
logger.debug("Set new cassandra dir for %s: %s" % (node.name, node.get_install_dir()))
self.cluster.set_install_dir(version=tag)
self.fixture_dtest_setup.reinitialize_cluster_for_different_version()
for node in nodes:
if tag < "2.1":
if "memtable_allocation_type" in node.config_options:
node.config_options.__delitem__("memtable_allocation_type")
# Restart nodes on new version
for node in nodes:
logger.debug('Starting %s on new version (%s)' % (node.name, tag))
# Setup log4j / logback again (necessary moving from 2.0 -> 2.1):
node.set_log_level("INFO")
node.start(wait_for_binary_proto=True)
node.nodetool('upgradesstables -a')