blob: 510d12f448c900f9a5f2b8b91a391fe7f4a7ee90 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.LivenessInfo;
import org.apache.cassandra.db.SinglePartitionSliceCommandTest;
import org.apache.cassandra.db.compaction.Verifier;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.SetType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.rows.RangeTombstoneMarker;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.StreamPlan;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import static org.apache.cassandra.cql3.CQLTester.assertRows;
import static org.apache.cassandra.cql3.CQLTester.row;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
* Tests backwards compatibility for SSTables
public class LegacySSTableTest
private static final Logger logger = LoggerFactory.getLogger(LegacySSTableTest.class);
public static final String LEGACY_SSTABLE_PROP = "legacy-sstable-root";
public static File LEGACY_SSTABLE_ROOT;
* When adding a new sstable version, add that one here.
* See {@link #testGenerateSstables()} to generate sstables.
* Take care on commit as you need to add the sstable files using {@code git add -f}
public static final String[] legacyVersions = {"mc", "mb", "ma", "la", "ka", "jb"};
// 1200 chars
static final String longString = "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" +
"0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" +
"0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" +
"0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" +
"0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" +
"0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" +
"0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" +
"0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" +
"0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" +
"0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" +
"0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" +
public static void defineSchema() throws ConfigurationException
for (String legacyVersion : legacyVersions)
String scp = System.getProperty(LEGACY_SSTABLE_PROP);
assert scp != null;
LEGACY_SSTABLE_ROOT = new File(scp).getAbsoluteFile();
assert LEGACY_SSTABLE_ROOT.isDirectory();
public void tearDown()
for (String legacyVersion : legacyVersions)
* Get a descriptor for the legacy sstable at the given version.
protected Descriptor getDescriptor(String legacyVersion, String table)
return new Descriptor(legacyVersion, getTableDir(legacyVersion, table), "legacy_tables", table, 1,
SSTableFormat.Type.BIG :SSTableFormat.Type.LEGACY);
public void testLoadLegacyCqlTables() throws Exception
for (String legacyVersion : legacyVersions)
{"Loading legacy version: {}", legacyVersion);
long startCount = CacheService.instance.keyCache.size();
verifyCache(legacyVersion, startCount);
public void testStreamLegacyCqlTables() throws Exception
for (String legacyVersion : legacyVersions)
public void testReverseIterationOfLegacyIndexedSSTable() throws Exception
// During upgrades from 2.1 to 3.0, reverse queries can drop rows before upgradesstables is completed
QueryProcessor.executeInternal("CREATE TABLE legacy_tables.legacy_ka_indexed (" +
" p int," +
" c int," +
" v1 int," +
" v2 int," +
" PRIMARY KEY(p, c)" +
loadLegacyTable("legacy_%s_indexed%s", "ka", "");
UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * " +
"FROM legacy_tables.legacy_ka_indexed " +
"WHERE p=1 " +
assertEquals(5000, rs.size());
public void testReadingLegacyIndexedSSTableWithStaticColumns() throws Exception
// During upgrades from 2.1 to 3.0, reading from tables with static columns errors before upgradesstables
// is completed
QueryProcessor.executeInternal("CREATE TABLE legacy_tables.legacy_ka_indexed_static (" +
" p int," +
" c int," +
" v1 int," +
" v2 int," +
" s1 int static," +
" s2 int static," +
" PRIMARY KEY(p, c)" +
loadLegacyTable("legacy_%s_indexed_static%s", "ka", "");
UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * " +
"FROM legacy_tables.legacy_ka_indexed_static " +
"WHERE p=1 ");
assertEquals(5000, rs.size());
public void test14766() throws Exception
* During upgrades from 2.1 to 3.0, reading from old sstables in reverse order could omit the very last row if the
* last indexed block had only two Unfiltered-s. See CASSANDRA-14766 for details.
* The sstable used here has two indexed blocks, with 2 cells/rows of ~500 bytes each, with column index interval of 1kb.
* Without the fix SELECT * returns 4 rows in ASC order, but only 3 rows in DESC order, omitting the last one.
QueryProcessor.executeInternal("CREATE TABLE legacy_tables.legacy_ka_14766 (pk int, ck int, value text, PRIMARY KEY (pk, ck));");
loadLegacyTable("legacy_%s_14766%s", "ka", "");
UntypedResultSet rs;
// read all rows in ASC order, expect all 4 to be returned
rs = QueryProcessor.executeInternal("SELECT * FROM legacy_tables.legacy_ka_14766 WHERE pk = 0 ORDER BY ck ASC;");
assertEquals(4, rs.size());
// read all rows in DESC order, expect all 4 to be returned
rs = QueryProcessor.executeInternal("SELECT * FROM legacy_tables.legacy_ka_14766 WHERE pk = 0 ORDER BY ck DESC;");
assertEquals(4, rs.size());
public void test14803() throws Exception
* During upgrades from 2.1 to 3.0, reading from old sstables in reverse order could return early if the sstable
* reverse iterator encounters an indexed block that only covers a single row, and that row starts in the next
* indexed block.
QueryProcessor.executeInternal("CREATE TABLE legacy_tables.legacy_ka_14803 (k int, c int, v1 blob, v2 blob, PRIMARY KEY (k, c));");
loadLegacyTable("legacy_%s_14803%s", "ka", "");
UntypedResultSet forward = QueryProcessor.executeOnceInternal(String.format("SELECT * FROM legacy_tables.legacy_ka_14803 WHERE k=100"));
UntypedResultSet reverse = QueryProcessor.executeOnceInternal(String.format("SELECT * FROM legacy_tables.legacy_ka_14803 WHERE k=100 ORDER BY c DESC"));"{} - {}", forward.size(), reverse.size());
assertEquals(forward.size(), reverse.size());
public void test14873() throws Exception
* When reading 2.1 sstables in 3.0 in reverse order it's possible to wrongly return an empty result set if the
* partition being read has a static row, and the read is performed backwards.
* Contents of the SSTable (column_index_size_in_kb: 1) below:
* insert into legacy_tables.legacy_ka_14873 (pkc, sc) values (0, 0);
* insert into legacy_tables.legacy_ka_14873 (pkc, cc, rc) values (0, 5, '5555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555');
* insert into legacy_tables.legacy_ka_14873 (pkc, cc, rc) values (0, 4, '4444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444');
* insert into legacy_tables.legacy_ka_14873 (pkc, cc, rc) values (0, 3, '3333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333');
* insert into legacy_tables.legacy_ka_14873 (pkc, cc, rc) values (0, 2, '2222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222');
* insert into legacy_tables.legacy_ka_14873 (pkc, cc, rc) values (0, 1, '1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111');
String ddl =
"CREATE TABLE legacy_tables.legacy_ka_14873 ("
+ "pkc int, cc int, sc int static, rc text, PRIMARY KEY (pkc, cc)"
+ ") WITH CLUSTERING ORDER BY (cc DESC) AND compaction = {'enabled' : 'false', 'class' : 'LeveledCompactionStrategy'};";
loadLegacyTable("legacy_%s_14873%s", "ka", "");
UntypedResultSet forward =
String.format("SELECT * FROM legacy_tables.legacy_ka_14873 WHERE pkc = 0 AND cc > 0 ORDER BY cc DESC;"));
UntypedResultSet reverse =
String.format("SELECT * FROM legacy_tables.legacy_ka_14873 WHERE pkc = 0 AND cc > 0 ORDER BY cc ASC;"));
assertEquals(5, forward.size());
assertEquals(5, reverse.size());
public void testMultiBlockRangeTombstones() throws Exception
* During upgrades from 2.1 to 3.0, reading old sstables in reverse order would generate invalid sequences of
* range tombstone bounds if their range tombstones spanned multiple column index blocks. The read would fail
* in different ways depending on whether the 2.1 tables were produced by a flush or a compaction.
String version = "ka";
for (String tableFmt : new String[]{"legacy_%s_compacted_multi_block_rt%s", "legacy_%s_flushed_multi_block_rt%s"})
String table = String.format(tableFmt, version, "");
QueryProcessor.executeOnceInternal(String.format("CREATE TABLE legacy_tables.%s " +
"(k int, c1 int, c2 int, v1 blob, v2 blob, " +
"PRIMARY KEY (k, c1, c2))", table));
loadLegacyTable(tableFmt, version, "");
UntypedResultSet forward = QueryProcessor.executeOnceInternal(String.format("SELECT * FROM legacy_tables.%s WHERE k=100", table));
UntypedResultSet reverse = QueryProcessor.executeOnceInternal(String.format("SELECT * FROM legacy_tables.%s WHERE k=100 ORDER BY c1 DESC, c2 DESC", table));
assertEquals(table, forward.size(), reverse.size());
public void testInaccurateSSTableMinMax() throws Exception
QueryProcessor.executeInternal("CREATE TABLE legacy_tables.legacy_mc_inaccurate_min_max (k int, c1 int, c2 int, c3 int, v int, primary key (k, c1, c2, c3))");
loadLegacyTable("legacy_%s_inaccurate_min_max%s", "mc", "");
sstable has the following mutations:
INSERT INTO legacy_tables.legacy_mc_inaccurate_min_max (k, c1, c2, c3, v) VALUES (100, 4, 4, 4, 4)
DELETE FROM legacy_tables.legacy_mc_inaccurate_min_max WHERE k=100 AND c1<3
String query = "SELECT * FROM legacy_tables.legacy_mc_inaccurate_min_max WHERE k=100 AND c1=1 AND c2=1";
List<Unfiltered> unfiltereds = SinglePartitionSliceCommandTest.getUnfilteredsFromSinglePartition(query);
assertEquals(2, unfiltereds.size());
Assert.assertTrue(((RangeTombstoneMarker) unfiltereds.get(0)).isOpen(false));
Assert.assertTrue(((RangeTombstoneMarker) unfiltereds.get(1)).isClose(false));
public void testVerifyOldSSTables() throws Exception
for (String legacyVersion : legacyVersions)
ColumnFamilyStore cfs ="legacy_tables").getColumnFamilyStore(String.format("legacy_%s_simple", legacyVersion));
for (SSTableReader sstable : cfs.getLiveSSTables())
try (Verifier verifier = new Verifier(cfs, sstable, false))
public void test14912() throws Exception
* When reading 2.1 sstables in 3.0, collection tombstones need to be checked against
* the dropped columns stored in table metadata. Failure to do so can result in unreadable
* rows if a column with the same name but incompatible type has subsequently been added.
* The original (i.e. pre-any ALTER statements) table definition for this test is:
* CREATE TABLE legacy_tables.legacy_ka_14912 (k int PRIMARY KEY, v1 set<text>, v2 text);
* The SSTable loaded emulates data being written before the table is ALTERed and contains:
* insert into legacy_tables.legacy_ka_14912 (k, v1, v2) values (0, {}, 'abc') USING TIMESTAMP 1543244999672280;
* insert into legacy_tables.legacy_ka_14912 (k, v1, v2) values (1, {'abc'}, 'abc') USING TIMESTAMP 1543244999672280;
* The timestamps of the (generated) collection tombstones are 1543244999672279, e.g. the <TIMESTAMP of the mutation> - 1
QueryProcessor.executeInternal("CREATE TABLE legacy_tables.legacy_ka_14912 (k int PRIMARY KEY, v1 text, v2 text)");
loadLegacyTable("legacy_%s_14912%s", "ka", "");
CFMetaData cfm ="legacy_tables").getColumnFamilyStore("legacy_ka_14912").metadata;
ColumnDefinition columnToDrop;
* This first variant simulates the original v1 set<text> column being dropped
* then re-added with the text type:
* CREATE TABLE legacy_tables.legacy_ka_14912 (k int PRIMARY KEY, v1 set<text>, v2 text);
* INSERT INTO legacy_tables.legacy)ka_14912 (k, v1, v2)...
* ALTER TABLE legacy_tables.legacy_ka_14912 DROP v1;
* ALTER TABLE legacy_tables.legacy_ka_14912 ADD v1 text;
columnToDrop = ColumnDefinition.regularDef(cfm,
SetType.getInstance(UTF8Type.instance, true));
cfm.recordColumnDrop(columnToDrop, 1543244999700000L);
// repeat the query, but simulate clock drift by shifting the recorded
// drop time forward so that it occurs before the collection timestamp
cfm.recordColumnDrop(columnToDrop, 1543244999600000L);
* This second test simulates the original v1 set<text> column being dropped
* then re-added with some other, non-collection type (overwriting the dropped
* columns record), then dropping and re-adding again as text type:
* CREATE TABLE legacy_tables.legacy_ka_14912 (k int PRIMARY KEY, v1 set<text>, v2 text);
* INSERT INTO legacy_tables.legacy_ka_14912 (k, v1, v2)...
* ALTER TABLE legacy_tables.legacy_ka_14912 DROP v1;
* ALTER TABLE legacy_tables.legacy_ka_14912 ADD v1 blob;
* ALTER TABLE legacy_tables.legacy_ka_14912 DROP v1;
* ALTER TABLE legacy_tables.legacy_ka_14912 ADD v1 text;
columnToDrop = ColumnDefinition.regularDef(cfm,
cfm.recordColumnDrop(columnToDrop, 1543244999700000L);
// repeat the query, but simulate clock drift by shifting the recorded
// drop time forward so that it occurs before the collection timestamp
cfm.recordColumnDrop(columnToDrop, 1543244999600000L);
public void testReadingLegacyTablesWithIllegalCellNames() throws Exception {
* The sstable can be generated externally with SSTableSimpleUnsortedWriter:
* [
* {"key": "1",
* "cells": [["a:aa:c1","61",1555000750634000],
* ["a:aa:c2","6161",1555000750634000],
* ["a:aa:pk","00000001",1555000750634000],
* ["a:aa:v1","aaa",1555000750634000]]},
* {"key": "2",
* "cells": [["b:bb:c1","62",1555000750634000],
* ["b:bb:c2","6262",1555000750634000],
* ["b:bb:pk","00000002",1555000750634000],
* ["b:bb:v1","bbb",1555000750634000]]}
* ]
* and an extra sstable with only the invalid cell name
* [
* {"key": "3",
* "cells": [["a:aa:pk","68656c6c6f30",1570466358949]]}
* ]
String table = "legacy_ka_with_illegal_cell_names";
QueryProcessor.executeInternal("CREATE TABLE legacy_tables." + table + " (" +
" pk int," +
" c1 text," +
" c2 text," +
" v1 text," +
" PRIMARY KEY(pk, c1, c2))");
loadLegacyTable("legacy_%s_with_illegal_cell_names%s", "ka", "");
UntypedResultSet results =
QueryProcessor.executeOnceInternal("SELECT * FROM legacy_tables."+table);
assertRows(results, row(1, "a", "aa", "aaa"), row(2, "b", "bb", "bbb"), row (3, "a", "aa", null));"legacy_tables").getColumnFamilyStore(table).forceMajorCompaction();
public void testReadingLegacyTablesWithIllegalCellNamesPKLI() throws Exception {
* Makes sure we grab the correct PKLI when we have illegal columns
* sstable looks like this:
* [
* {"key": "3",
* "cells": [["a:aa:","",100],
* ["a:aa:pk","6d656570",200]]}
* ]
this generates the stable on 2.1:
CFMetaData metadata = CFMetaData.compile("create table legacy_tables.legacy_ka_with_illegal_cell_names_2 (pk int, c1 text, c2 text, v1 text, primary key (pk, c1, c2))", "legacy_tables");
try (SSTableSimpleUnsortedWriter writer = new SSTableSimpleUnsortedWriter(new File("/tmp/sstable21"),
new ByteOrderedPartitioner(),
writer.addColumn(new BufferCell(Util.cellname("a", "aa", ""), bytes(""), 100));
writer.addColumn(new BufferCell(Util.cellname("a", "aa", "pk"), bytes("meep"), 200));
String table = "legacy_ka_with_illegal_cell_names_2";
QueryProcessor.executeInternal("CREATE TABLE legacy_tables." + table + " (" +
" pk int," +
" c1 text," +
" c2 text," +
" v1 text," +
" PRIMARY KEY(pk, c1, c2))");
loadLegacyTable("legacy_%s_with_illegal_cell_names_2%s", "ka", "");
ColumnFamilyStore cfs ="legacy_tables").getColumnFamilyStore(table);
assertEquals(1, Iterables.size(cfs.getSSTables(SSTableSet.CANONICAL)));
assertEquals(1, Iterables.size(cfs.getSSTables(SSTableSet.CANONICAL)));
SSTableReader sstable = Iterables.getFirst(cfs.getSSTables(SSTableSet.CANONICAL), null);
LivenessInfo livenessInfo = null;
try (ISSTableScanner scanner = sstable.getScanner())
while (scanner.hasNext())
try (UnfilteredRowIterator iter =
while (iter.hasNext())
Unfiltered uf =;
livenessInfo = ((Row)uf).primaryKeyLivenessInfo();
assertEquals(100, livenessInfo.timestamp());
public void testReadingIndexedLegacyTablesWithIllegalCellNames() throws Exception {
* The sstable can be generated externally with SSTableSimpleUnsortedWriter:
* column_index_size_in_kb: 1
* [
* {"key": "key",
* "cells": [
* ["00000:000000:a","00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",0],
* ["00000:000000:b","00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",0]
* ["00000:000000:c","00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",0]
* ["00000:000000:z","00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",0]
* ["00001:000001:a","00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",0],
* ["00001:000001:b","00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",0]
* ["00001:000001:c","00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",0]
* ["00001:000001:z","00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",0]
* .
* .
* .
* ["00010:000010:a","00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",0],
* ["00010:000010:b","00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",0]
* ["00010:000010:c","00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",0]
* ["00010:000010:z","00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",0]
* ]
* }
* ]
* Each row in the partition contains only 1 valid cell. The ones with the column name components 'a', 'b' & 'z' are illegal as they refer to PRIMARY KEY
* columns, but SSTables such as this can be generated with offline tools and loaded via SSTableLoader or nodetool refresh (see CASSANDRA-15086) (see
* CASSANDRA-15086) Only 'c' is a valid REGULAR column in the table schema.
* In the initial fix for CASSANDRA-15086, the bytes read by OldFormatDeserializer for these invalid cells are not correctly accounted for, causing
* ReverseIndexedReader to assert that the end of a block has been reached earlier than it actually has, which in turn causes rows to be incorrectly
* ommitted from the results.
* This sstable has been crafted to hit a further potential error condition. Rows 00001:00001 and 00008:00008 interact with the index block boundaries
* in a very specific way; for both of these rows, the (illegal) cells 'a' & 'b', along with the valid 'c' cell are at the end of an index block, but
* the 'z' cell is over the boundary, in the following block. We need to ensure that the bytes consumed for the 'z' cell are properly accounted for and
* not counted toward those for the next row on disk.
QueryProcessor.executeInternal("CREATE TABLE legacy_tables.legacy_ka_with_illegal_cell_names_indexed (" +
" a text," +
" b text," +
" z text," +
" c text," +
" PRIMARY KEY(a, b, z))");
loadLegacyTable("legacy_%s_with_illegal_cell_names_indexed%s", "ka", "");
String queryForward = "SELECT * FROM legacy_tables.legacy_ka_with_illegal_cell_names_indexed WHERE a = 'key'";
String queryReverse = queryForward + " ORDER BY b DESC, z DESC";
List<String> forward = new ArrayList<>();
QueryProcessor.executeOnceInternal(queryForward).forEach(r -> forward.add(r.getString("b") + ":" + r.getString("z")));
List<String> reverse = new ArrayList<>();
QueryProcessor.executeOnceInternal(queryReverse).forEach(r -> reverse.add(r.getString("b") + ":" + r.getString("z")));
assertEquals(11, reverse.size());
assertEquals(11, forward.size());
for (int i=0; i < 11; i++)
assertEquals(forward.get(i), reverse.get(10 - i));
private void assertExpectedRowsWithDroppedCollection(boolean droppedCheckSuccessful)
for (int i=0; i<=1; i++)
UntypedResultSet rows =
String.format("SELECT * FROM legacy_tables.legacy_ka_14912 WHERE k = %s;", i));
assertEquals(1, rows.size());
UntypedResultSet.Row row =;
// If the best-effort attempt to filter dropped columns was successful, then the row
// should not contain the v1 column at all. Likewise, if no column data was written,
// only a tombstone, then no v1 column should be present.
// However, if collection data was written (i.e. where k=1), then if the dropped column
// check didn't filter the legacy cells, we should expect an empty column value as the
// legacy collection tombstone won't cover it and the dropped column check doesn't filter
// it.
if (droppedCheckSuccessful || i == 0)
assertEquals("", row.getString("v1"));
assertEquals("abc", row.getString("v2"));
private void streamLegacyTables(String legacyVersion) throws Exception
for (int compact = 0; compact <= 1; compact++)
{"Streaming legacy version {}{}", legacyVersion, getCompactNameSuffix(compact));
streamLegacyTable("legacy_%s_simple%s", legacyVersion, getCompactNameSuffix(compact));
streamLegacyTable("legacy_%s_simple_counter%s", legacyVersion, getCompactNameSuffix(compact));
streamLegacyTable("legacy_%s_clust%s", legacyVersion, getCompactNameSuffix(compact));
streamLegacyTable("legacy_%s_clust_counter%s", legacyVersion, getCompactNameSuffix(compact));
private void streamLegacyTable(String tablePattern, String legacyVersion, String compactNameSuffix) throws Exception
String table = String.format(tablePattern, legacyVersion, compactNameSuffix);
SSTableReader sstable =, table));
IPartitioner p = sstable.getPartitioner();
List<Range<Token>> ranges = new ArrayList<>();
ranges.add(new Range<>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("100"))));
ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("100")), p.getMinimumToken()));
ArrayList<StreamSession.SSTableStreamingSections> details = new ArrayList<>();
details.add(new StreamSession.SSTableStreamingSections(sstable.ref(),
sstable.estimatedKeysForRanges(ranges), sstable.getSSTableMetadata().repairedAt));
new StreamPlan("LegacyStreamingTest").transferFiles(FBUtilities.getBroadcastAddress(), details)
private static void loadLegacyTables(String legacyVersion) throws Exception
for (int compact = 0; compact <= 1; compact++)
{"Preparing legacy version {}{}", legacyVersion, getCompactNameSuffix(compact));
loadLegacyTable("legacy_%s_simple%s", legacyVersion, getCompactNameSuffix(compact));
loadLegacyTable("legacy_%s_simple_counter%s", legacyVersion, getCompactNameSuffix(compact));
loadLegacyTable("legacy_%s_clust%s", legacyVersion, getCompactNameSuffix(compact));
loadLegacyTable("legacy_%s_clust_counter%s", legacyVersion, getCompactNameSuffix(compact));
private static void verifyCache(String legacyVersion, long startCount) throws InterruptedException, java.util.concurrent.ExecutionException
//Validate whether the key cache successfully saves in the presence of old keys as
//well as loads the correct number of keys
long endCount = CacheService.instance.keyCache.size();
Assert.assertTrue(endCount > startCount);
assertEquals(startCount, CacheService.instance.keyCache.size());
if (BigFormat.instance.getVersion(legacyVersion).storeRows())
assertEquals(endCount, CacheService.instance.keyCache.size());
assertEquals(startCount, CacheService.instance.keyCache.size());
private static void verifyReads(String legacyVersion)
for (int compact = 0; compact <= 1; compact++)
for (int ck = 0; ck < 50; ck++)
String ckValue = Integer.toString(ck) + longString;
for (int pk = 0; pk < 5; pk++)
logger.debug("for pk={} ck={}", pk, ck);
String pkValue = Integer.toString(pk);
UntypedResultSet rs;
if (ck == 0)
readSimpleTable(legacyVersion, getCompactNameSuffix(compact), pkValue);
readSimpleCounterTable(legacyVersion, getCompactNameSuffix(compact), pkValue);
readClusteringTable(legacyVersion, getCompactNameSuffix(compact), ck, ckValue, pkValue);
readClusteringCounterTable(legacyVersion, getCompactNameSuffix(compact), ckValue, pkValue);
private static void readClusteringCounterTable(String legacyVersion, String compactSuffix, String ckValue, String pkValue)
logger.debug("Read legacy_{}_clust_counter{}", legacyVersion, compactSuffix);
UntypedResultSet rs;
rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_clust_counter%s WHERE pk=? AND ck=?", legacyVersion, compactSuffix), pkValue, ckValue);
assertEquals(1, rs.size());
private static void readClusteringTable(String legacyVersion, String compactSuffix, int ck, String ckValue, String pkValue)
logger.debug("Read legacy_{}_clust{}", legacyVersion, compactSuffix);
UntypedResultSet rs;
rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_clust%s WHERE pk=? AND ck=?", legacyVersion, compactSuffix), pkValue, ckValue);
assertLegacyClustRows(1, rs);
String ckValue2 = Integer.toString(ck < 10 ? 40 : ck - 1) + longString;
String ckValue3 = Integer.toString(ck > 39 ? 10 : ck + 1) + longString;
rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_clust%s WHERE pk=? AND ck IN (?, ?, ?)", legacyVersion, compactSuffix), pkValue, ckValue, ckValue2, ckValue3);
assertLegacyClustRows(3, rs);
private static void readSimpleCounterTable(String legacyVersion, String compactSuffix, String pkValue)
logger.debug("Read legacy_{}_simple_counter{}", legacyVersion, compactSuffix);
UntypedResultSet rs;
rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_simple_counter%s WHERE pk=?", legacyVersion, compactSuffix), pkValue);
assertEquals(1, rs.size());
private static void readSimpleTable(String legacyVersion, String compactSuffix, String pkValue)
logger.debug("Read simple: legacy_{}_simple{}", legacyVersion, compactSuffix);
UntypedResultSet rs;
rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_simple%s WHERE pk=?", legacyVersion, compactSuffix), pkValue);
assertEquals(1, rs.size());
assertEquals("foo bar baz","val"));
private static void createKeyspace()
QueryProcessor.executeInternal("CREATE KEYSPACE legacy_tables WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}");
private static void createTables(String legacyVersion)
for (int i=0; i<=1; i++)
String compactSuffix = getCompactNameSuffix(i);
String tableSuffix = i == 0? "" : " WITH COMPACT STORAGE";
QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_simple%s (pk text PRIMARY KEY, val text)%s", legacyVersion, compactSuffix, tableSuffix));
QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_simple_counter%s (pk text PRIMARY KEY, val counter)%s", legacyVersion, compactSuffix, tableSuffix));
QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_clust%s (pk text, ck text, val text, PRIMARY KEY (pk, ck))%s", legacyVersion, compactSuffix, tableSuffix));
QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_clust_counter%s (pk text, ck text, val counter, PRIMARY KEY (pk, ck))%s", legacyVersion, compactSuffix, tableSuffix));
private static String getCompactNameSuffix(int i)
return i == 0? "" : "_compact";
private static void truncateTables(String legacyVersion)
for (int compact = 0; compact <= 1; compact++)
QueryProcessor.executeInternal(String.format("TRUNCATE legacy_tables.legacy_%s_simple%s", legacyVersion, getCompactNameSuffix(compact)));
QueryProcessor.executeInternal(String.format("TRUNCATE legacy_tables.legacy_%s_simple_counter%s", legacyVersion, getCompactNameSuffix(compact)));
QueryProcessor.executeInternal(String.format("TRUNCATE legacy_tables.legacy_%s_clust%s", legacyVersion, getCompactNameSuffix(compact)));
QueryProcessor.executeInternal(String.format("TRUNCATE legacy_tables.legacy_%s_clust_counter%s", legacyVersion, getCompactNameSuffix(compact)));
private static void assertLegacyClustRows(int count, UntypedResultSet rs)
assertEquals(count, rs.size());
for (int i = 0; i < count; i++)
for (UntypedResultSet.Row r : rs)
assertEquals(128, r.getString("val").length());
private static void loadLegacyTable(String tablePattern, String legacyVersion, String compactSuffix) throws IOException
String table = String.format(tablePattern, legacyVersion, compactSuffix);"Loading legacy table {}", table);
ColumnFamilyStore cfs ="legacy_tables").getColumnFamilyStore(table);
for (File cfDir : cfs.getDirectories().getCFDirectories())
copySstablesToTestData(legacyVersion, table, cfDir);
* Test for CASSANDRA-15778
public void testReadLegacyCqlCreatedTableWithBytes() throws Exception {
String table = "legacy_ka_cql_created_dense_table_with_bytes";
QueryProcessor.executeInternal("CREATE TABLE legacy_tables." + table + " (" +
" k int," +
" v text," +
loadLegacyTable("legacy_%s_cql_created_dense_table_with_bytes%s", "ka", "");
QueryProcessor.executeInternal("ALTER TABLE legacy_tables." + table + " ALTER value TYPE 'org.apache.cassandra.db.marshal.BytesType';");
UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM legacy_tables." + table);
assertEquals(1, rs.size());
assertEquals(ByteBufferUtil.bytes("byte string"),"value"));
* Test for CASSANDRA-15778
public void testReadLegacyCqlCreatedTableWithInt() throws Exception {
String table = "legacy_ka_cql_created_dense_table_with_int";
QueryProcessor.executeInternal("CREATE TABLE legacy_tables." + table + " (" +
" k int," +
" v text," +
loadLegacyTable("legacy_%s_cql_created_dense_table_with_int%s", "ka", "");
QueryProcessor.executeInternal("ALTER TABLE legacy_tables." + table + " ALTER value TYPE 'org.apache.cassandra.db.marshal.BytesType';");
UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM legacy_tables." + table);
assertEquals(1, rs.size());
* Generates sstables for 8 CQL tables (see {@link #createTables(String)}) in <i>current</i>
* sstable format (version) into {@code test/data/legacy-sstables/VERSION}, where
* {@code VERSION} matches {@link Version#getVersion() BigFormat.latestVersion.getVersion()}.
* <p>
* Run this test alone (e.g. from your IDE) when a new version is introduced or format changed
* during development. I.e. remove the {@code @Ignore} annotation temporarily.
* </p>
public void testGenerateSstables() throws Throwable
Random rand = new Random();
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 128; i++)
sb.append((char)('a' + rand.nextInt(26)));
String randomString = sb.toString();
for (int compact = 0; compact <= 1; compact++)
for (int pk = 0; pk < 5; pk++)
String valPk = Integer.toString(pk);
QueryProcessor.executeInternal(String.format("INSERT INTO legacy_tables.legacy_%s_simple%s (pk, val) VALUES ('%s', '%s')",
BigFormat.latestVersion, getCompactNameSuffix(compact), valPk, "foo bar baz"));
QueryProcessor.executeInternal(String.format("UPDATE legacy_tables.legacy_%s_simple_counter%s SET val = val + 1 WHERE pk = '%s'",
BigFormat.latestVersion, getCompactNameSuffix(compact), valPk));
for (int ck = 0; ck < 50; ck++)
String valCk = Integer.toString(ck);
QueryProcessor.executeInternal(String.format("INSERT INTO legacy_tables.legacy_%s_clust%s (pk, ck, val) VALUES ('%s', '%s', '%s')",
BigFormat.latestVersion, getCompactNameSuffix(compact), valPk, valCk + longString, randomString));
QueryProcessor.executeInternal(String.format("UPDATE legacy_tables.legacy_%s_clust_counter%s SET val = val + 1 WHERE pk = '%s' AND ck='%s'",
BigFormat.latestVersion, getCompactNameSuffix(compact), valPk, valCk + longString));
File ksDir = new File(LEGACY_SSTABLE_ROOT, String.format("%s/legacy_tables", BigFormat.latestVersion));
for (int compact = 0; compact <= 1; compact++)
copySstablesFromTestData(String.format("legacy_%s_simple%s", BigFormat.latestVersion, getCompactNameSuffix(compact)), ksDir);
copySstablesFromTestData(String.format("legacy_%s_simple_counter%s", BigFormat.latestVersion, getCompactNameSuffix(compact)), ksDir);
copySstablesFromTestData(String.format("legacy_%s_clust%s", BigFormat.latestVersion, getCompactNameSuffix(compact)), ksDir);
copySstablesFromTestData(String.format("legacy_%s_clust_counter%s", BigFormat.latestVersion, getCompactNameSuffix(compact)), ksDir);
public static void copySstablesFromTestData(String table, File ksDir) throws IOException
File cfDir = new File(ksDir, table);
for (File srcDir :"legacy_tables").getColumnFamilyStore(table).getDirectories().getCFDirectories())
for (File file : srcDir.listFiles())
copyFile(cfDir, file);
private static void copySstablesToTestData(String legacyVersion, String table, File cfDir) throws IOException
for (File file : getTableDir(legacyVersion, table).listFiles())
copyFile(cfDir, file);
private static File getTableDir(String legacyVersion, String table)
return new File(LEGACY_SSTABLE_ROOT, String.format("%s/legacy_tables/%s", legacyVersion, table));
private static void copyFile(File cfDir, File file) throws IOException
byte[] buf = new byte[65536];
if (file.isFile())
File target = new File(cfDir, file.getName());
int rd;
FileInputStream is = new FileInputStream(file);
FileOutputStream os = new FileOutputStream(target);
while ((rd = >= 0)
os.write(buf, 0, rd);