blob: 8aaea3d660a0840d742dceb5db1cd5a73bdda1a6 [file] [log] [blame]
import time
import pytest
import logging
from cassandra.concurrent import execute_concurrent_with_args
from tools.assertions import assert_invalid, assert_all, assert_one
from dtest import Tester, create_ks
since = pytest.mark.since
logger = logging.getLogger(__name__)
class TestSchema(Tester):
def test_table_alteration(self):
"""
Tests that table alters return as expected with many sstables at different schema points
"""
cluster = self.cluster
cluster.populate(1).start()
node1, = cluster.nodelist()
session = self.patient_cql_connection(node1)
create_ks(session, 'ks', 1)
session.execute("use ks;")
session.execute("create table tbl_o_churn (id int primary key, c0 text, c1 text) "
"WITH compaction = {'class': 'SizeTieredCompactionStrategy', 'min_threshold': 1024, 'max_threshold': 1024 };")
stmt1 = session.prepare("insert into tbl_o_churn (id, c0, c1) values (?, ?, ?)")
rows_to_insert = 50
for n in range(5):
parameters = [(x, 'aaa', 'bbb') for x in range(n * rows_to_insert, (n * rows_to_insert) + rows_to_insert)]
execute_concurrent_with_args(session, stmt1, parameters, concurrency=rows_to_insert)
node1.flush()
session.execute("alter table tbl_o_churn add c2 text")
session.execute("alter table tbl_o_churn drop c0")
stmt2 = session.prepare("insert into tbl_o_churn (id, c1, c2) values (?, ?, ?);")
for n in range(5, 10):
parameters = [(x, 'ccc', 'ddd') for x in range(n * rows_to_insert, (n * rows_to_insert) + rows_to_insert)]
execute_concurrent_with_args(session, stmt2, parameters, concurrency=rows_to_insert)
node1.flush()
rows = session.execute("select * from tbl_o_churn")
for row in rows:
if row.id < rows_to_insert * 5:
assert row.c1 == 'bbb'
assert row.c2 is None
assert not hasattr(row, 'c0')
else:
assert row.c1 == 'ccc'
assert row.c2 == 'ddd'
assert not hasattr(row, 'c0')
@since("2.0", max_version="3.X") # Compact Storage
def test_drop_column_compact(self):
session = self.prepare()
session.execute("USE ks")
session.execute("CREATE TABLE cf (key int PRIMARY KEY, c1 int, c2 int) WITH COMPACT STORAGE")
assert_invalid(session, "ALTER TABLE cf DROP c1", "Cannot drop columns from a")
def test_drop_column_compaction(self):
session = self.prepare()
session.execute("USE ks")
session.execute("CREATE TABLE cf (key int PRIMARY KEY, c1 int, c2 int)")
# insert some data.
session.execute("INSERT INTO cf (key, c1, c2) VALUES (0, 1, 2)")
session.execute("INSERT INTO cf (key, c1, c2) VALUES (1, 2, 3)")
session.execute("INSERT INTO cf (key, c1, c2) VALUES (2, 3, 4)")
# drop and readd c1.
session.execute("ALTER TABLE cf DROP c1")
session.execute("ALTER TABLE cf ADD c1 int")
# add another row.
session.execute("INSERT INTO cf (key, c1, c2) VALUES (3, 4, 5)")
node = self.cluster.nodelist()[0]
node.flush()
node.compact()
# test that c1 values have been compacted away.
session = self.patient_cql_connection(node)
assert_all(session, "SELECT c1 FROM ks.cf", [[None], [None], [None], [4]], ignore_order=True)
def test_drop_column_queries(self):
session = self.prepare()
session.execute("USE ks")
session.execute("CREATE TABLE cf (key int PRIMARY KEY, c1 int, c2 int)")
session.execute("CREATE INDEX ON cf(c2)")
# insert some data.
session.execute("INSERT INTO cf (key, c1, c2) VALUES (0, 1, 2)")
session.execute("INSERT INTO cf (key, c1, c2) VALUES (1, 2, 3)")
session.execute("INSERT INTO cf (key, c1, c2) VALUES (2, 3, 4)")
# drop and readd c1.
session.execute("ALTER TABLE cf DROP c1")
session.execute("ALTER TABLE cf ADD c1 int")
# add another row.
session.execute("INSERT INTO cf (key, c1, c2) VALUES (3, 4, 5)")
# test that old (pre-drop) c1 values aren't returned and new ones are.
assert_all(session, "SELECT c1 FROM cf", [[None], [None], [None], [4]], ignore_order=True)
assert_all(session, "SELECT * FROM cf", [[0, None, 2], [1, None, 3], [2, None, 4], [3, 4, 5]], ignore_order=True)
assert_one(session, "SELECT c1 FROM cf WHERE key = 0", [None])
assert_one(session, "SELECT c1 FROM cf WHERE key = 3", [4])
assert_one(session, "SELECT * FROM cf WHERE c2 = 2", [0, None, 2])
assert_one(session, "SELECT * FROM cf WHERE c2 = 5", [3, 4, 5])
def test_drop_column_and_restart(self):
"""
Simply insert data in a table, drop a column involved in the insert and restart the node afterwards.
This ensures that the dropped_columns system table is properly flushed on the alter or the restart
fails as in CASSANDRA-11050.
@jira_ticket CASSANDRA-11050
"""
session = self.prepare()
session.execute("USE ks")
session.execute("CREATE TABLE t (k int PRIMARY KEY, c1 int, c2 int)")
session.execute("INSERT INTO t (k, c1, c2) VALUES (0, 0, 0)")
session.execute("ALTER TABLE t DROP c2")
assert_one(session, "SELECT * FROM t", [0, 0])
self.cluster.stop()
self.cluster.start()
session = self.patient_cql_connection(self.cluster.nodelist()[0])
session.execute("USE ks")
assert_one(session, "SELECT * FROM t", [0, 0])
def test_drop_static_column_and_restart(self):
"""
Dropping a static column caused an sstable corrupt exception after restarting, here
we test that we can drop a static column and restart safely.
@jira_ticket CASSANDRA-12582
"""
session = self.prepare()
session.execute("USE ks")
session.execute("CREATE TABLE ts (id1 int, id2 int, id3 int static, val text, PRIMARY KEY (id1, id2))")
session.execute("INSERT INTO ts (id1, id2, id3, val) VALUES (1, 1, 0, 'v1')")
session.execute("INSERT INTO ts (id1, id2, id3, val) VALUES (1, 2, 0, 'v2')")
session.execute("INSERT INTO ts (id1, id2, id3, val) VALUES (2, 1, 1, 'v3')")
self.cluster.nodelist()[0].nodetool('flush ks ts')
assert_all(session, "SELECT * FROM ts", [[1, 1, 0, 'v1'], [1, 2, 0, 'v2'], [2, 1, 1, 'v3']])
session.execute("alter table ts drop id3")
assert_all(session, "SELECT * FROM ts", [[1, 1, 'v1'], [1, 2, 'v2'], [2, 1, 'v3']])
self.cluster.stop()
self.cluster.start()
session = self.patient_cql_connection(self.cluster.nodelist()[0])
session.execute("USE ks")
assert_all(session, "SELECT * FROM ts", [[1, 1, 'v1'], [1, 2, 'v2'], [2, 1, 'v3']])
def prepare(self):
cluster = self.cluster
cluster.populate(1).start()
time.sleep(.5)
nodes = cluster.nodelist()
session = self.patient_cql_connection(nodes[0])
create_ks(session, 'ks', 1)
return session