blob: d5607e0ad2287abcb6d2cf926d26b02a28844411 [file] [log] [blame]
import time
import logging
from cassandra import ConsistencyLevel
from cassandra.concurrent import execute_concurrent_with_args
from cassandra.query import SimpleStatement
from . import assertions
from dtest import create_cf, DtestTimeoutError
from tools.funcutils import get_rate_limited_function
logger = logging.getLogger(__name__)
def create_c1c2_table(tester, session, read_repair=None):
create_cf(session, 'cf', columns={'c1': 'text', 'c2': 'text'}, read_repair=read_repair)
def insert_c1c2(session, keys=None, n=None, consistency=ConsistencyLevel.QUORUM):
if (keys is None and n is None) or (keys is not None and n is not None):
raise ValueError("Expected exactly one of 'keys' or 'n' arguments to not be None; "
"got keys={keys}, n={n}".format(keys=keys, n=n))
if n:
keys = list(range(n))
statement = session.prepare("INSERT INTO cf (key, c1, c2) VALUES (?, 'value1', 'value2')")
statement.consistency_level = consistency
execute_concurrent_with_args(session, statement, [['k{}'.format(k)] for k in keys])
def query_c1c2(session, key, consistency=ConsistencyLevel.QUORUM, tolerate_missing=False, must_be_missing=False):
query = SimpleStatement('SELECT c1, c2 FROM cf WHERE key=\'k%d\'' % key, consistency_level=consistency)
rows = list(session.execute(query))
if not tolerate_missing:
assertions.assert_length_equal(rows, 1)
res = rows[0]
assert len(res) == 2 and res[0] == 'value1' and res[1] == 'value2', res
if must_be_missing:
assertions.assert_length_equal(rows, 0)
def insert_columns(tester, session, key, columns_count, consistency=ConsistencyLevel.QUORUM, offset=0):
upds = ["UPDATE cf SET v=\'value%d\' WHERE key=\'k%s\' AND c=\'c%06d\'" % (i, key, i) for i in range(offset * columns_count, columns_count * (offset + 1))]
query = 'BEGIN BATCH %s; APPLY BATCH' % '; '.join(upds)
simple_query = SimpleStatement(query, consistency_level=consistency)
session.execute(simple_query)
def query_columns(tester, session, key, columns_count, consistency=ConsistencyLevel.QUORUM, offset=0):
query = SimpleStatement('SELECT c, v FROM cf WHERE key=\'k%s\' AND c >= \'c%06d\' AND c <= \'c%06d\'' % (key, offset, columns_count + offset - 1), consistency_level=consistency)
res = list(session.execute(query))
assertions.assert_length_equal(res, columns_count)
for i in range(0, columns_count):
assert res[i][1] == 'value{}'.format(i + offset)
# Simple puts and get (on one row), testing both reads by names and by slice,
# with overwrites and flushes between inserts to make sure we hit multiple
# sstables on reads
def putget(cluster, session, cl=ConsistencyLevel.QUORUM):
_put_with_overwrite(cluster, session, 1, cl)
# reads by name
# We do not support proper IN queries yet
# if cluster.version() >= "1.2":
# session.execute('SELECT * FROM cf USING CONSISTENCY %s WHERE key=\'k0\' AND c IN (%s)' % (cl, ','.join(ks)))
# else:
# session.execute('SELECT %s FROM cf USING CONSISTENCY %s WHERE key=\'k0\'' % (','.join(ks), cl))
# _validate_row(cluster, session)
# slice reads
query = SimpleStatement('SELECT * FROM cf WHERE key=\'k0\'', consistency_level=cl)
rows = list(session.execute(query))
_validate_row(cluster, rows)
def _put_with_overwrite(cluster, session, nb_keys, cl=ConsistencyLevel.QUORUM):
for k in range(0, nb_keys):
kvs = ["UPDATE cf SET v=\'value%d\' WHERE key=\'k%s\' AND c=\'c%02d\'" % (i, k, i) for i in range(0, 100)]
query = SimpleStatement('BEGIN BATCH %s APPLY BATCH' % '; '.join(kvs), consistency_level=cl)
session.execute(query)
time.sleep(.01)
cluster.flush()
for k in range(0, nb_keys):
kvs = ["UPDATE cf SET v=\'value%d\' WHERE key=\'k%s\' AND c=\'c%02d\'" % (i * 4, k, i * 2) for i in range(0, 50)]
query = SimpleStatement('BEGIN BATCH %s APPLY BATCH' % '; '.join(kvs), consistency_level=cl)
session.execute(query)
time.sleep(.01)
cluster.flush()
for k in range(0, nb_keys):
kvs = ["UPDATE cf SET v=\'value%d\' WHERE key=\'k%s\' AND c=\'c%02d\'" % (i * 20, k, i * 5) for i in range(0, 20)]
query = SimpleStatement('BEGIN BATCH %s APPLY BATCH' % '; '.join(kvs), consistency_level=cl)
session.execute(query)
time.sleep(.01)
cluster.flush()
def _validate_row(cluster, res):
assertions.assert_length_equal(res, 100)
for i in range(0, 100):
if i % 5 == 0:
assert res[i][2] == 'value{}'.format(i * 4), 'for {}, expecting value{}, got {}'.format(i, i * 4, res[i][2])
elif i % 2 == 0:
assert res[i][2] == 'value{}'.format(i * 2), 'for {}, expecting value{}, got {}'.format(i, i * 2, res[i][2])
else:
assert res[i][2] == 'value{}'.format(i), 'for {}, expecting value{}, got {}'.format(i, i, res[i][2])
# Simple puts and range gets, with overwrites and flushes between inserts to
# make sure we hit multiple sstables on reads
def range_putget(cluster, session, cl=ConsistencyLevel.QUORUM):
keys = 100
_put_with_overwrite(cluster, session, keys, cl)
paged_results = session.execute('SELECT * FROM cf LIMIT 10000000')
rows = [result for result in paged_results]
assertions.assert_length_equal(rows, keys * 100)
for k in range(0, keys):
res = rows[:100]
del rows[:100]
_validate_row(cluster, res)
def get_keyspace_metadata(session, keyspace_name):
cluster = session.cluster
cluster.refresh_keyspace_metadata(keyspace_name)
return cluster.metadata.keyspaces[keyspace_name]
def get_schema_metadata(session):
cluster = session.cluster
cluster.refresh_schema_metadata()
return cluster.metadata
def get_table_metadata(session, keyspace_name, table_name):
cluster = session.cluster
cluster.refresh_table_metadata(keyspace_name, table_name)
return cluster.metadata.keyspaces[keyspace_name].tables[table_name]
def rows_to_list(rows):
new_list = [list(row) for row in rows]
return new_list
def index_is_built(node, session, keyspace, table_name, idx_name):
# checks if an index has been built
full_idx_name = idx_name if node.get_cassandra_version() > '3.0' else '{}.{}'.format(table_name, idx_name)
index_query = """SELECT * FROM system."IndexInfo" WHERE table_name = '{}' AND index_name = '{}'""".format(keyspace, full_idx_name)
return len(list(session.execute(index_query))) == 1
def block_until_index_is_built(node, session, keyspace, table_name, idx_name):
"""
Waits up to 30 seconds for a secondary index to be built, and raises
DtestTimeoutError if it is not.
"""
start = time.time()
rate_limited_debug_logger = get_rate_limited_function(logger.debug, 5)
while time.time() < start + 30:
rate_limited_debug_logger("waiting for index to build")
time.sleep(1)
if index_is_built(node, session, keyspace, table_name, idx_name):
break
else:
raise DtestTimeoutError()