blob: 6c9f8a1aa19ec07151dbd94fb3af02ddaf82babc [file] [log] [blame]
import time
import pytest
import logging
from cassandra.concurrent import execute_concurrent_with_args
from tools.assertions import assert_invalid, assert_all, assert_one, assert_none, assert_some
from dtest import Tester, create_ks, create_cf
since = pytest.mark.since
logger = logging.getLogger(__name__)
class TestSchema(Tester):
def test_table_alteration(self):
"""
Tests that table alters return as expected with many sstables at different schema points
"""
cluster = self.cluster
cluster.populate(1).start()
node1, = cluster.nodelist()
session = self.patient_cql_connection(node1)
create_ks(session, 'ks', 1)
session.execute("use ks;")
session.execute("create table tbl_o_churn (id int primary key, c0 text, c1 text) "
"WITH compaction = {'class': 'SizeTieredCompactionStrategy', 'min_threshold': 1024, 'max_threshold': 1024 };")
stmt1 = session.prepare("insert into tbl_o_churn (id, c0, c1) values (?, ?, ?)")
rows_to_insert = 50
for n in range(5):
parameters = [(x, 'aaa', 'bbb') for x in range(n * rows_to_insert, (n * rows_to_insert) + rows_to_insert)]
execute_concurrent_with_args(session, stmt1, parameters, concurrency=rows_to_insert)
node1.flush()
session.execute("alter table tbl_o_churn add c2 text")
session.execute("alter table tbl_o_churn drop c0")
stmt2 = session.prepare("insert into tbl_o_churn (id, c1, c2) values (?, ?, ?);")
for n in range(5, 10):
parameters = [(x, 'ccc', 'ddd') for x in range(n * rows_to_insert, (n * rows_to_insert) + rows_to_insert)]
execute_concurrent_with_args(session, stmt2, parameters, concurrency=rows_to_insert)
node1.flush()
rows = session.execute("select * from tbl_o_churn")
for row in rows:
if row.id < rows_to_insert * 5:
assert row.c1 == 'bbb'
assert row.c2 is None
assert not hasattr(row, 'c0')
else:
assert row.c1 == 'ccc'
assert row.c2 == 'ddd'
assert not hasattr(row, 'c0')
@since("2.0", max_version="3.X") # Compact Storage
def test_drop_column_compact(self):
session = self.prepare()
session.execute("USE ks")
session.execute("CREATE TABLE cf (key int PRIMARY KEY, c1 int, c2 int) WITH COMPACT STORAGE")
assert_invalid(session, "ALTER TABLE cf DROP c1", "Cannot drop columns from a")
def test_drop_column_compaction(self):
session = self.prepare()
session.execute("USE ks")
session.execute("CREATE TABLE cf (key int PRIMARY KEY, c1 int, c2 int)")
# insert some data.
session.execute("INSERT INTO cf (key, c1, c2) VALUES (0, 1, 2)")
session.execute("INSERT INTO cf (key, c1, c2) VALUES (1, 2, 3)")
session.execute("INSERT INTO cf (key, c1, c2) VALUES (2, 3, 4)")
# drop and readd c1.
session.execute("ALTER TABLE cf DROP c1")
session.execute("ALTER TABLE cf ADD c1 int")
# add another row.
session.execute("INSERT INTO cf (key, c1, c2) VALUES (3, 4, 5)")
node = self.cluster.nodelist()[0]
node.flush()
node.compact()
# test that c1 values have been compacted away.
session = self.patient_cql_connection(node)
assert_all(session, "SELECT c1 FROM ks.cf", [[None], [None], [None], [4]], ignore_order=True)
def test_drop_column_queries(self):
session = self.prepare()
session.execute("USE ks")
session.execute("CREATE TABLE cf (key int PRIMARY KEY, c1 int, c2 int)")
session.execute("CREATE INDEX ON cf(c2)")
# insert some data.
session.execute("INSERT INTO cf (key, c1, c2) VALUES (0, 1, 2)")
session.execute("INSERT INTO cf (key, c1, c2) VALUES (1, 2, 3)")
session.execute("INSERT INTO cf (key, c1, c2) VALUES (2, 3, 4)")
# drop and readd c1.
session.execute("ALTER TABLE cf DROP c1")
session.execute("ALTER TABLE cf ADD c1 int")
# add another row.
session.execute("INSERT INTO cf (key, c1, c2) VALUES (3, 4, 5)")
# test that old (pre-drop) c1 values aren't returned and new ones are.
assert_all(session, "SELECT c1 FROM cf", [[None], [None], [None], [4]], ignore_order=True)
assert_all(session, "SELECT * FROM cf", [[0, None, 2], [1, None, 3], [2, None, 4], [3, 4, 5]], ignore_order=True)
assert_one(session, "SELECT c1 FROM cf WHERE key = 0", [None])
assert_one(session, "SELECT c1 FROM cf WHERE key = 3", [4])
assert_one(session, "SELECT * FROM cf WHERE c2 = 2", [0, None, 2])
assert_one(session, "SELECT * FROM cf WHERE c2 = 5", [3, 4, 5])
def test_drop_column_and_restart(self):
"""
Simply insert data in a table, drop a column involved in the insert and restart the node afterwards.
This ensures that the dropped_columns system table is properly flushed on the alter or the restart
fails as in CASSANDRA-11050.
@jira_ticket CASSANDRA-11050
"""
session = self.prepare()
session.execute("USE ks")
session.execute("CREATE TABLE t (k int PRIMARY KEY, c1 int, c2 int)")
session.execute("INSERT INTO t (k, c1, c2) VALUES (0, 0, 0)")
session.execute("ALTER TABLE t DROP c2")
assert_one(session, "SELECT * FROM t", [0, 0])
self.cluster.stop()
self.cluster.start()
session = self.patient_cql_connection(self.cluster.nodelist()[0])
session.execute("USE ks")
assert_one(session, "SELECT * FROM t", [0, 0])
def test_drop_static_column_and_restart(self):
"""
Dropping a static column caused an sstable corrupt exception after restarting, here
we test that we can drop a static column and restart safely.
@jira_ticket CASSANDRA-12582
"""
session = self.prepare()
session.execute("USE ks")
session.execute("CREATE TABLE ts (id1 int, id2 int, id3 int static, val text, PRIMARY KEY (id1, id2))")
session.execute("INSERT INTO ts (id1, id2, id3, val) VALUES (1, 1, 0, 'v1')")
session.execute("INSERT INTO ts (id1, id2, id3, val) VALUES (1, 2, 0, 'v2')")
session.execute("INSERT INTO ts (id1, id2, id3, val) VALUES (2, 1, 1, 'v3')")
self.cluster.nodelist()[0].nodetool('flush ks ts')
assert_all(session, "SELECT * FROM ts", [[1, 1, 0, 'v1'], [1, 2, 0, 'v2'], [2, 1, 1, 'v3']])
session.execute("alter table ts drop id3")
assert_all(session, "SELECT * FROM ts", [[1, 1, 'v1'], [1, 2, 'v2'], [2, 1, 'v3']])
self.cluster.stop()
self.cluster.start()
session = self.patient_cql_connection(self.cluster.nodelist()[0])
session.execute("USE ks")
assert_all(session, "SELECT * FROM ts", [[1, 1, 'v1'], [1, 2, 'v2'], [2, 1, 'v3']])
@since('3.0')
def drop_table_reflected_in_size_estimates_test(self):
"""
A dropped table should result in its entries being removed from size estimates, on both
nodes that are up and down at the time of the drop.
@jira_ticket CASSANDRA-14905
"""
cluster = self.cluster
cluster.populate(2).start()
node1, node2 = cluster.nodelist()
session = self.patient_exclusive_cql_connection(node1)
create_ks(session, 'ks1', 2)
create_ks(session, 'ks2', 2)
create_cf(session, 'ks1.cf1', columns={'c1': 'text', 'c2': 'text'})
create_cf(session, 'ks2.cf1', columns={'c1': 'text', 'c2': 'text'})
create_cf(session, 'ks2.cf2', columns={'c1': 'text', 'c2': 'text'})
node1.nodetool('refreshsizeestimates')
node2.nodetool('refreshsizeestimates')
node2.stop()
session.execute('DROP TABLE ks2.cf1')
session.execute('DROP KEYSPACE ks1')
node2.start(wait_for_binary_proto=True)
session2 = self.patient_exclusive_cql_connection(node2)
session.cluster.control_connection.wait_for_schema_agreement()
assert_none(session, "SELECT * FROM system.size_estimates WHERE keyspace_name='ks1'")
assert_none(session, "SELECT * FROM system.size_estimates WHERE keyspace_name='ks2' AND table_name='cf1'")
assert_some(session, "SELECT * FROM system.size_estimates WHERE keyspace_name='ks2' AND table_name='cf2'")
assert_none(session2, "SELECT * FROM system.size_estimates WHERE keyspace_name='ks1'")
assert_none(session2, "SELECT * FROM system.size_estimates WHERE keyspace_name='ks2' AND table_name='cf1'")
assert_some(session, "SELECT * FROM system.size_estimates WHERE keyspace_name='ks2' AND table_name='cf2'")
@since('3.0')
def invalid_entries_removed_from_size_estimates_on_restart_test(self):
"""
Entries for dropped tables/keyspaces should be cleared from size_estimates on restart.
@jira_ticket CASSANDRA-14905
"""
cluster = self.cluster
cluster.populate(1).start()
node = cluster.nodelist()[0]
session = self.patient_cql_connection(node)
session.execute("USE system;")
session.execute("INSERT INTO size_estimates (keyspace_name, table_name, range_start, range_end, mean_partition_size, partitions_count) VALUES ( 'system_auth', 'bad_table', '-5', '5', 0, 0);")
# Invalid keyspace and table
session.execute("INSERT INTO size_estimates (keyspace_name, table_name, range_start, range_end, mean_partition_size, partitions_count) VALUES ( 'bad_keyspace', 'bad_table', '-5', '5', 0, 0);")
node.stop()
node.start()
session = self.patient_cql_connection(node)
assert_none(session, "SELECT * FROM system.size_estimates WHERE keyspace_name='system_auth' AND table_name='bad_table'")
assert_none(session, "SELECT * FROM system.size_estimates WHERE keyspace_name='bad_keyspace'")
def prepare(self):
cluster = self.cluster
cluster.populate(1).start()
time.sleep(.5)
nodes = cluster.nodelist()
session = self.patient_cql_connection(nodes[0])
create_ks(session, 'ks', 1)
return session