blob: cab39ec0dc566b1fb26bd5aec9dbaa737c462750 [file] [log] [blame]
import os
import subprocess
import time
import distutils.dir_util
from distutils.version import LooseVersion
import pytest
import logging
from ccmlib import common as ccmcommon
from ccmlib.node import ToolError
from dtest import Tester, create_ks, create_cf, MAJOR_VERSION_4
from tools.assertions import assert_all, assert_none, assert_one
since = pytest.mark.since
logger = logging.getLogger(__name__)
# WARNING: sstableloader tests should be added to TestSSTableGenerationAndLoading (below),
# and not to BaseSStableLoaderTest (which is shared with upgrade tests)
# Also used by upgrade_tests/storage_engine_upgrade_test
# to test loading legacy sstables
class TestBaseSStableLoader(Tester):
@pytest.fixture(autouse=True)
def fixture_add_additional_log_patterns(self, fixture_dtest_setup):
fixture_dtest_setup.allow_log_errors = True
upgrade_from = None
test_compact = False
def compact(self):
return self.fixture_dtest_setup.cluster.version() < MAJOR_VERSION_4 and self.test_compact
def create_schema(self, session, ks, compression):
create_ks(session, ks, rf=2)
create_cf(session, "standard1", compression=compression, compact_storage=self.compact())
create_cf(session, "counter1", compression=compression, columns={'v': 'counter'},
compact_storage=self.compact())
def skip_base_class_test(self):
if self.__class__.__name__ != 'TestBasedSSTableLoader' and self.upgrade_from is None:
pytest.skip("Don't need to run base class test, only derived classes")
def create_schema_40(self, session, ks, compression):
create_ks(session, ks, rf=2)
create_cf(session, "standard1", compression=compression, compact_storage=self.compact())
create_cf(session, "counter1", key_type='text', compression=compression, columns={'column1': 'text',
'v': 'counter static',
'value': 'counter'},
primary_key="key, column1", clustering='column1 ASC', compact_storage=self.compact())
def test_sstableloader_compression_none_to_none(self):
self.skip_base_class_test()
self.load_sstable_with_configuration(None, None)
def test_sstableloader_compression_none_to_snappy(self):
self.skip_base_class_test()
self.load_sstable_with_configuration(None, 'Snappy')
def test_sstableloader_compression_none_to_deflate(self):
self.skip_base_class_test()
self.load_sstable_with_configuration(None, 'Deflate')
def test_sstableloader_compression_snappy_to_none(self):
self.skip_base_class_test()
self.load_sstable_with_configuration('Snappy', None)
def test_sstableloader_compression_snappy_to_snappy(self):
self.skip_base_class_test()
self.load_sstable_with_configuration('Snappy', 'Snappy')
def test_sstableloader_compression_snappy_to_deflate(self):
self.skip_base_class_test()
self.load_sstable_with_configuration('Snappy', 'Deflate')
def test_sstableloader_compression_deflate_to_none(self):
self.skip_base_class_test()
self.load_sstable_with_configuration('Deflate', None)
def test_sstableloader_compression_deflate_to_snappy(self):
self.skip_base_class_test()
self.load_sstable_with_configuration('Deflate', 'Snappy')
def test_sstableloader_compression_deflate_to_deflate(self):
self.skip_base_class_test()
self.load_sstable_with_configuration('Deflate', 'Deflate')
def test_sstableloader_with_mv(self):
"""
@jira_ticket CASSANDRA-11275
"""
self.skip_base_class_test()
def create_schema_with_mv(session, ks, compression):
self.create_schema(session, ks, compression)
# create a materialized view
session.execute("CREATE MATERIALIZED VIEW mv1 AS "
"SELECT key FROM standard1 WHERE key IS NOT NULL AND c IS NOT NULL AND v IS NOT NULL "
"PRIMARY KEY (v)")
self.load_sstable_with_configuration(ks='"Keyspace1"', create_schema=create_schema_with_mv)
def copy_sstables(self, cluster, node):
for x in range(0, cluster.data_dir_count):
data_dir = os.path.join(node.get_path(), 'data{0}'.format(x))
copy_root = os.path.join(node.get_path(), 'data{0}_copy'.format(x))
for ddir in os.listdir(data_dir):
keyspace_dir = os.path.join(data_dir, ddir)
if os.path.isdir(keyspace_dir) and ddir != 'system':
copy_dir = os.path.join(copy_root, ddir)
distutils.dir_util.copy_tree(keyspace_dir, copy_dir)
def load_sstables(self, cluster, node, ks):
cdir = node.get_install_dir()
sstableloader = os.path.join(cdir, 'bin', ccmcommon.platform_binary('sstableloader'))
env = ccmcommon.make_cassandra_env(cdir, node.get_path())
host = node.address()
for x in range(0, cluster.data_dir_count):
sstablecopy_dir = os.path.join(node.get_path(), 'data{0}_copy'.format(x), ks.strip('"'))
for cf_dir in os.listdir(sstablecopy_dir):
full_cf_dir = os.path.join(sstablecopy_dir, cf_dir)
if os.path.isdir(full_cf_dir):
cmd_args = [sstableloader, '--nodes', host, full_cf_dir]
p = subprocess.Popen(cmd_args, stderr=subprocess.PIPE, stdout=subprocess.PIPE, env=env)
stdout, stderr = p.communicate()
exit_status = p.returncode
logger.debug('stdout: {out}'.format(out=stdout.decode("utf-8")))
logger.debug('stderr: {err}'.format(err=stderr.decode("utf-8")))
assert 0 == exit_status, \
"sstableloader exited with a non-zero status: {}".format(exit_status)
def load_sstable_with_configuration(self, pre_compression=None, post_compression=None, ks="ks", create_schema=create_schema):
"""
tests that the sstableloader works by using it to load data.
Compression of the columnfamilies being loaded, and loaded into
can be specified.
pre_compression and post_compression can be these values:
None, 'Snappy', or 'Deflate'.
"""
NUM_KEYS = 1000
for compression_option in (pre_compression, post_compression):
assert compression_option in (None, 'Snappy', 'Deflate')
logger.debug("Testing sstableloader with pre_compression=%s and post_compression=%s" % (pre_compression, post_compression))
if self.upgrade_from:
logger.debug("Testing sstableloader with upgrade_from=%s and compact=%s" % (self.upgrade_from, self.compact))
cluster = self.cluster
if self.upgrade_from:
logger.debug("Generating sstables with version %s" % (self.upgrade_from))
default_install_version = self.cluster.version()
default_install_dir = self.cluster.get_install_dir()
# Forcing cluster version on purpose
cluster.set_install_dir(version=self.upgrade_from)
self.fixture_dtest_setup.reinitialize_cluster_for_different_version()
logger.debug("Using jvm_args={}".format(self.jvm_args))
cluster.populate(2).start(jvm_args=list(self.jvm_args))
node1, node2 = cluster.nodelist()
time.sleep(.5)
logger.debug("creating keyspace and inserting")
session = self.cql_connection(node1)
self.create_schema(session, ks, pre_compression)
for i in range(NUM_KEYS):
session.execute("UPDATE standard1 SET v='{}' WHERE KEY='{}' AND c='col'".format(i, i))
session.execute("UPDATE counter1 SET v=v+1 WHERE KEY='{}'".format(i))
#Will upgrade to a version that doesn't support compact storage so revert the compact
#storage, this doesn't actually fix it yet
if self.compact() and default_install_version >= MAJOR_VERSION_4:
session.execute('alter table standard1 drop compact storage');
session.execute('alter table counter1 drop compact storage');
node1.nodetool('rebuild')
node1.nodetool('cleanup')
node2.nodetool('rebuild')
node2.nodetool('cleanup')
node1.nodetool('drain')
node1.stop()
node2.nodetool('drain')
node2.stop()
logger.debug("Making a copy of the sstables")
# make a copy of the sstables
self.copy_sstables(cluster, node1)
logger.debug("Wiping out the data and restarting cluster")
# wipe out the node data.
cluster.clear()
if self.upgrade_from:
logger.debug("Running sstableloader with version from %s" % (default_install_dir))
# Return to previous version
cluster.set_install_dir(install_dir=default_install_dir)
self.fixture_dtest_setup.reinitialize_cluster_for_different_version()
cluster.start(jvm_args=list(self.jvm_args))
time.sleep(5) # let gossip figure out what is going on
logger.debug("re-creating the keyspace and column families.")
session = self.cql_connection(node1)
if self.test_compact and default_install_version >= MAJOR_VERSION_4:
self.create_schema_40(session, ks, post_compression)
else:
self.create_schema(session, ks, post_compression)
time.sleep(2)
logger.debug("Calling sstableloader")
# call sstableloader to re-load each cf.
self.load_sstables(cluster, node1, ks)
def read_and_validate_data(session):
for i in range(NUM_KEYS):
query = "SELECT * FROM standard1 WHERE KEY='{}'".format(i)
assert_one(session, query, [str(i), 'col', str(i)])
query = "SELECT key, v FROM counter1 WHERE KEY='{}'".format(i)
assert_one(session, query, [str(i), 1])
logger.debug("Reading data back")
# Now we should have sstables with the loaded data, and the existing
# data. Lets read it all to make sure it is all there.
read_and_validate_data(session)
logger.debug("scrubbing, compacting, and repairing")
# do some operations and try reading the data again.
node1.nodetool('scrub')
node1.nodetool('compact')
try:
node1.nodetool('repair')
except ToolError as e:
print("Caught ToolError")
logger.debug("Reading data back one more time")
read_and_validate_data(session)
# check that RewindableDataInputStreamPlus spill files are properly cleaned up
if self.upgrade_from:
for x in range(0, cluster.data_dir_count):
data_dir = os.path.join(node1.get_path(), 'data{0}'.format(x))
for ddir in os.listdir(data_dir):
keyspace_dir = os.path.join(data_dir, ddir)
temp_files = self.glob_data_dirs(os.path.join(keyspace_dir, '*', "tmp", "*.dat"))
logger.debug("temp files: " + str(temp_files))
assert 0 == len(temp_files), "Temporary files were not cleaned up."
class TestSSTableGenerationAndLoading(TestBaseSStableLoader):
def test_sstableloader_uppercase_keyspace_name(self):
"""
Make sure sstableloader works with upper case keyspace
@jira_ticket CASSANDRA-10806
"""
self.load_sstable_with_configuration(ks='"Keyspace1"')
def test_incompressible_data_in_compressed_table(self):
"""
tests for the bug that caused #3370:
https://issues.apache.org/jira/browse/CASSANDRA-3370
@jira_ticket CASSANDRA-3370
inserts random data into a compressed table. The compressed SSTable was
compared to the uncompressed and was found to indeed be larger then
uncompressed.
"""
cluster = self.cluster
cluster.populate(1).start()
node1 = cluster.nodelist()[0]
time.sleep(.5)
session = self.patient_cql_connection(node1)
create_ks(session, 'ks', 1)
create_cf(session, 'cf', compression="Deflate")
# make unique column names, and values that are incompressible
for col in range(10):
col_name = str(col)
col_val = os.urandom(5000)
col_val = col_val.hex()
cql = "UPDATE cf SET v='%s' WHERE KEY='0' AND c='%s'" % (col_val, col_name)
# print cql
session.execute(cql)
node1.flush()
time.sleep(2)
rows = list(session.execute("SELECT * FROM cf WHERE KEY = '0' AND c < '8'"))
assert len(rows) > 0
def test_remove_index_file(self):
"""
tests for situations similar to that found in #343:
https://issues.apache.org/jira/browse/CASSANDRA-343
@jira_ticket CASSANDRA-343
"""
cluster = self.cluster
cluster.populate(1).start()
node1 = cluster.nodelist()[0]
# Makinge sure the cluster is ready to accept the subsequent
# stress connection. This was an issue on Windows.
node1.stress(['write', 'n=10K', 'no-warmup', '-rate', 'threads=8'])
node1.flush()
node1.compact()
node1.stop()
time.sleep(1)
paths = []
for data_dir in node1.data_directories():
basepath = os.path.join(data_dir, 'keyspace1')
for x in os.listdir(basepath):
if x.startswith("standard1"):
path = os.path.join(basepath, x)
os.system('rm %s/*Index.db' % path)
os.system('rm %s/*Filter.db' % path)
os.system('rm %s/*Statistics.db' % path)
os.system('rm %s/*Digest.sha1' % path)
paths.append(path)
node1.start()
time.sleep(10)
data_found = 0
for path in paths:
for fname in os.listdir(path):
if fname.endswith('Data.db'):
data_found += 1
assert data_found > 0, "After removing index, filter, stats, and digest files > the data file was deleted!"
def test_sstableloader_with_mv(self):
"""
@jira_ticket CASSANDRA-11275
"""
def create_schema_with_mv(session, ks, compression):
self.cluster.nodelist()[0].set_configuration_options({'enable_materialized_views': 'true'})
self.create_schema(session, ks, compression)
# create a materialized view
session.execute("CREATE MATERIALIZED VIEW mv1 AS "
"SELECT key FROM standard1 WHERE key IS NOT NULL AND c IS NOT NULL AND v IS NOT NULL "
"PRIMARY KEY (v)")
self.load_sstable_with_configuration(ks='"Keyspace1"', create_schema=create_schema_with_mv)
@since('4.0')
def test_sstableloader_with_failing_2i(self):
"""
@jira_ticket CASSANDRA-10130
Simulates an index building failure during SSTables load.
The table data should be loaded and the index should be marked for rebuilding during the next node start.
"""
def create_schema_with_2i(session):
create_ks(session, 'k', 1)
session.execute("CREATE TABLE k.t (p int, c int, v int, PRIMARY KEY(p, c))")
session.execute("CREATE INDEX idx ON k.t(v)")
cluster = self.cluster
cluster.populate(1, install_byteman=True).start()
node = cluster.nodelist()[0]
session = self.patient_cql_connection(node)
create_schema_with_2i(session)
session.execute("INSERT INTO k.t(p, c, v) VALUES (0, 1, 8)")
# Stop node and copy SSTables
node.nodetool('drain')
node.stop()
self.copy_sstables(cluster, node)
# Wipe out data and restart
cluster.clear()
cluster.start()
# Restore the schema
session = self.patient_cql_connection(node)
create_schema_with_2i(session)
# The table should exist and be empty, and the index should be empty and marked as built
assert_one(session, """SELECT * FROM system."IndexInfo" WHERE table_name='k'""", ['k', 'idx', None])
assert_none(session, "SELECT * FROM k.t")
assert_none(session, "SELECT * FROM k.t WHERE v = 8")
# Add some additional data before loading the SSTable, to check that it will be still accessible
session.execute("INSERT INTO k.t(p, c, v) VALUES (0, 2, 8)")
assert_one(session, "SELECT * FROM k.t", [0, 2, 8])
assert_one(session, "SELECT * FROM k.t WHERE v = 8", [0, 2, 8])
# Load SSTables with a failure during index creation
node.byteman_submit(['./byteman/index_build_failure.btm'])
with pytest.raises(Exception):
self.load_sstables(cluster, node, 'k')
# Check that the index isn't marked as built and the old SSTable data has been loaded but not indexed
assert_none(session, """SELECT * FROM system."IndexInfo" WHERE table_name='k'""")
assert_all(session, "SELECT * FROM k.t", [[0, 1, 8], [0, 2, 8]])
assert_one(session, "SELECT * FROM k.t WHERE v = 8", [0, 2, 8])
# Restart the node to trigger index rebuild
node.nodetool('drain')
node.stop()
cluster.start()
session = self.patient_cql_connection(node)
# Check that the index is marked as built and the index has been rebuilt
assert_one(session, """SELECT * FROM system."IndexInfo" WHERE table_name='k'""", ['k', 'idx', None])
assert_all(session, "SELECT * FROM k.t", [[0, 1, 8], [0, 2, 8]])
assert_all(session, "SELECT * FROM k.t WHERE v = 8", [[0, 1, 8], [0, 2, 8]])