| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.io.sstable; |
| |
| |
| import java.io.IOException; |
| import java.nio.file.Files; |
| import java.nio.file.Path; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Random; |
| |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Iterables; |
| import org.apache.cassandra.io.util.File; |
| import org.apache.cassandra.io.util.FileInputStreamPlus; |
| import org.apache.cassandra.io.util.FileOutputStreamPlus; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.BeforeClass; |
| import org.junit.Ignore; |
| import org.junit.Test; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.cassandra.SchemaLoader; |
| import org.apache.cassandra.config.DatabaseDescriptor; |
| import org.apache.cassandra.cql3.QueryProcessor; |
| import org.apache.cassandra.cql3.UntypedResultSet; |
| import org.apache.cassandra.db.ColumnFamilyStore; |
| import org.apache.cassandra.db.Keyspace; |
| import org.apache.cassandra.db.compaction.AbstractCompactionTask; |
| import org.apache.cassandra.db.compaction.CompactionManager; |
| import org.apache.cassandra.db.repair.PendingAntiCompaction; |
| import org.apache.cassandra.db.streaming.CassandraOutgoingFile; |
| import org.apache.cassandra.db.SinglePartitionSliceCommandTest; |
| import org.apache.cassandra.db.compaction.Verifier; |
| import org.apache.cassandra.db.rows.RangeTombstoneMarker; |
| import org.apache.cassandra.db.rows.Unfiltered; |
| import org.apache.cassandra.dht.IPartitioner; |
| import org.apache.cassandra.dht.Range; |
| import org.apache.cassandra.dht.Token; |
| import org.apache.cassandra.exceptions.ConfigurationException; |
| import org.apache.cassandra.io.sstable.format.SSTableReader; |
| import org.apache.cassandra.io.sstable.format.Version; |
| import org.apache.cassandra.io.sstable.format.big.BigFormat; |
| import org.apache.cassandra.service.CacheService; |
| import org.apache.cassandra.service.StorageService; |
| import org.apache.cassandra.streaming.OutgoingStream; |
| import org.apache.cassandra.streaming.StreamPlan; |
| import org.apache.cassandra.streaming.StreamOperation; |
| import org.apache.cassandra.utils.ByteBufferUtil; |
| import org.apache.cassandra.utils.FBUtilities; |
| import org.apache.cassandra.utils.TimeUUID; |
| |
| import static java.util.Collections.singleton; |
| import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR; |
| import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE; |
| import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| /** |
| * Tests backwards compatibility for SSTables |
| */ |
| public class LegacySSTableTest |
| { |
| private static final Logger logger = LoggerFactory.getLogger(LegacySSTableTest.class); |
| |
| public static final String LEGACY_SSTABLE_PROP = "legacy-sstable-root"; |
| |
| public static File LEGACY_SSTABLE_ROOT; |
| |
| /** |
| * When adding a new sstable version, add that one here. |
| * See {@link #testGenerateSstables()} to generate sstables. |
| * Take care on commit as you need to add the sstable files using {@code git add -f} |
| */ |
| public static final String[] legacyVersions = {"nb", "na", "me", "md", "mc", "mb", "ma"}; |
| |
| // 1200 chars |
| static final String longString = "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" + |
| "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" + |
| "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" + |
| "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" + |
| "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" + |
| "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" + |
| "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" + |
| "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" + |
| "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" + |
| "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" + |
| "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" + |
| "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"; |
| |
| @BeforeClass |
| public static void defineSchema() throws ConfigurationException |
| { |
| String scp = System.getProperty(LEGACY_SSTABLE_PROP); |
| Assert.assertNotNull("System property " + LEGACY_SSTABLE_PROP + " not set", scp); |
| |
| LEGACY_SSTABLE_ROOT = new File(scp).toAbsolute(); |
| Assert.assertTrue("System property " + LEGACY_SSTABLE_ROOT + " does not specify a directory", LEGACY_SSTABLE_ROOT.isDirectory()); |
| |
| SchemaLoader.prepareServer(); |
| StorageService.instance.initServer(); |
| Keyspace.setInitialized(); |
| createKeyspace(); |
| for (String legacyVersion : legacyVersions) |
| { |
| createTables(legacyVersion); |
| } |
| |
| } |
| |
| @After |
| public void tearDown() |
| { |
| for (String legacyVersion : legacyVersions) |
| { |
| truncateTables(legacyVersion); |
| } |
| } |
| |
| /** |
| * Get a descriptor for the legacy sstable at the given version. |
| */ |
| protected Descriptor getDescriptor(String legacyVersion, String table) throws IOException |
| { |
| Path file = Files.list(getTableDir(legacyVersion, table).toPath()).findFirst().orElseThrow(() -> new RuntimeException(String.format("No files for version=%s and table=%s", legacyVersion, table))); |
| return Descriptor.fromFilename(new File(file)); |
| } |
| |
| @Test |
| public void testLoadLegacyCqlTables() throws Exception |
| { |
| DatabaseDescriptor.setColumnIndexCacheSize(99999); |
| CacheService.instance.invalidateKeyCache(); |
| doTestLegacyCqlTables(); |
| } |
| |
| @Test |
| public void testLoadLegacyCqlTablesShallow() throws Exception |
| { |
| DatabaseDescriptor.setColumnIndexCacheSize(0); |
| CacheService.instance.invalidateKeyCache(); |
| doTestLegacyCqlTables(); |
| } |
| |
| @Test |
| public void testMutateMetadata() throws Exception |
| { |
| // we need to make sure we write old version metadata in the format for that version |
| for (String legacyVersion : legacyVersions) |
| { |
| logger.info("Loading legacy version: {}", legacyVersion); |
| truncateLegacyTables(legacyVersion); |
| loadLegacyTables(legacyVersion); |
| CacheService.instance.invalidateKeyCache(); |
| |
| for (ColumnFamilyStore cfs : Keyspace.open("legacy_tables").getColumnFamilyStores()) |
| { |
| for (SSTableReader sstable : cfs.getLiveSSTables()) |
| { |
| sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, 1234, NO_PENDING_REPAIR, false); |
| sstable.reloadSSTableMetadata(); |
| assertEquals(1234, sstable.getRepairedAt()); |
| if (sstable.descriptor.version.hasPendingRepair()) |
| assertEquals(NO_PENDING_REPAIR, sstable.getPendingRepair()); |
| } |
| |
| boolean isTransient = false; |
| for (SSTableReader sstable : cfs.getLiveSSTables()) |
| { |
| TimeUUID random = nextTimeUUID(); |
| sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, UNREPAIRED_SSTABLE, random, isTransient); |
| sstable.reloadSSTableMetadata(); |
| assertEquals(UNREPAIRED_SSTABLE, sstable.getRepairedAt()); |
| if (sstable.descriptor.version.hasPendingRepair()) |
| assertEquals(random, sstable.getPendingRepair()); |
| if (sstable.descriptor.version.hasIsTransient()) |
| assertEquals(isTransient, sstable.isTransient()); |
| |
| isTransient = !isTransient; |
| } |
| } |
| } |
| } |
| |
| @Test |
| public void testMutateMetadataCSM() throws Exception |
| { |
| // we need to make sure we write old version metadata in the format for that version |
| for (String legacyVersion : legacyVersions) |
| { |
| // Skip 2.0.1 sstables as it doesn't have repaired information |
| if (legacyVersion.equals("jb")) |
| continue; |
| truncateTables(legacyVersion); |
| loadLegacyTables(legacyVersion); |
| |
| for (ColumnFamilyStore cfs : Keyspace.open("legacy_tables").getColumnFamilyStores()) |
| { |
| // set pending |
| for (SSTableReader sstable : cfs.getLiveSSTables()) |
| { |
| TimeUUID random = nextTimeUUID(); |
| try |
| { |
| cfs.getCompactionStrategyManager().mutateRepaired(Collections.singleton(sstable), UNREPAIRED_SSTABLE, random, false); |
| if (!sstable.descriptor.version.hasPendingRepair()) |
| fail("We should fail setting pending repair on unsupported sstables "+sstable); |
| } |
| catch (IllegalStateException e) |
| { |
| if (sstable.descriptor.version.hasPendingRepair()) |
| fail("We should succeed setting pending repair on "+legacyVersion + " sstables, failed on "+sstable); |
| } |
| } |
| // set transient |
| for (SSTableReader sstable : cfs.getLiveSSTables()) |
| { |
| try |
| { |
| cfs.getCompactionStrategyManager().mutateRepaired(Collections.singleton(sstable), UNREPAIRED_SSTABLE, nextTimeUUID(), true); |
| if (!sstable.descriptor.version.hasIsTransient()) |
| fail("We should fail setting pending repair on unsupported sstables "+sstable); |
| } |
| catch (IllegalStateException e) |
| { |
| if (sstable.descriptor.version.hasIsTransient()) |
| fail("We should succeed setting pending repair on "+legacyVersion + " sstables, failed on "+sstable); |
| } |
| } |
| } |
| } |
| } |
| |
| @Test |
| public void testMutateLevel() throws Exception |
| { |
| // we need to make sure we write old version metadata in the format for that version |
| for (String legacyVersion : legacyVersions) |
| { |
| logger.info("Loading legacy version: {}", legacyVersion); |
| truncateLegacyTables(legacyVersion); |
| loadLegacyTables(legacyVersion); |
| CacheService.instance.invalidateKeyCache(); |
| |
| for (ColumnFamilyStore cfs : Keyspace.open("legacy_tables").getColumnFamilyStores()) |
| { |
| for (SSTableReader sstable : cfs.getLiveSSTables()) |
| { |
| sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 1234); |
| sstable.reloadSSTableMetadata(); |
| assertEquals(1234, sstable.getSSTableLevel()); |
| } |
| } |
| } |
| } |
| |
| private void doTestLegacyCqlTables() throws Exception |
| { |
| for (String legacyVersion : legacyVersions) |
| { |
| logger.info("Loading legacy version: {}", legacyVersion); |
| truncateLegacyTables(legacyVersion); |
| loadLegacyTables(legacyVersion); |
| CacheService.instance.invalidateKeyCache(); |
| long startCount = CacheService.instance.keyCache.size(); |
| verifyReads(legacyVersion); |
| verifyCache(legacyVersion, startCount); |
| compactLegacyTables(legacyVersion); |
| } |
| } |
| |
| @Test |
| public void testStreamLegacyCqlTables() throws Exception |
| { |
| for (String legacyVersion : legacyVersions) |
| { |
| streamLegacyTables(legacyVersion); |
| verifyReads(legacyVersion); |
| } |
| } |
| |
| @Test |
| public void testInaccurateSSTableMinMax() throws Exception |
| { |
| QueryProcessor.executeInternal("CREATE TABLE legacy_tables.legacy_mc_inaccurate_min_max (k int, c1 int, c2 int, c3 int, v int, primary key (k, c1, c2, c3))"); |
| loadLegacyTable("legacy_%s_inaccurate_min_max", "mc"); |
| |
| /* |
| sstable has the following mutations: |
| INSERT INTO legacy_tables.legacy_mc_inaccurate_min_max (k, c1, c2, c3, v) VALUES (100, 4, 4, 4, 4) |
| DELETE FROM legacy_tables.legacy_mc_inaccurate_min_max WHERE k=100 AND c1<3 |
| */ |
| |
| String query = "SELECT * FROM legacy_tables.legacy_mc_inaccurate_min_max WHERE k=100 AND c1=1 AND c2=1"; |
| List<Unfiltered> unfiltereds = SinglePartitionSliceCommandTest.getUnfilteredsFromSinglePartition(query); |
| Assert.assertEquals(2, unfiltereds.size()); |
| Assert.assertTrue(unfiltereds.get(0).isRangeTombstoneMarker()); |
| Assert.assertTrue(((RangeTombstoneMarker) unfiltereds.get(0)).isOpen(false)); |
| Assert.assertTrue(unfiltereds.get(1).isRangeTombstoneMarker()); |
| Assert.assertTrue(((RangeTombstoneMarker) unfiltereds.get(1)).isClose(false)); |
| } |
| |
| @Test |
| public void testVerifyOldSSTables() throws IOException |
| { |
| for (String legacyVersion : legacyVersions) |
| { |
| ColumnFamilyStore cfs = Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_simple", legacyVersion)); |
| loadLegacyTable("legacy_%s_simple", legacyVersion); |
| |
| for (SSTableReader sstable : cfs.getLiveSSTables()) |
| { |
| try (Verifier verifier = new Verifier(cfs, sstable, false, Verifier.options().checkVersion(true).build())) |
| { |
| verifier.verify(); |
| if (!sstable.descriptor.version.isLatestVersion()) |
| fail("Verify should throw RuntimeException for old sstables "+sstable); |
| } |
| catch (RuntimeException e) |
| {} |
| } |
| // make sure we don't throw any exception if not checking version: |
| for (SSTableReader sstable : cfs.getLiveSSTables()) |
| { |
| try (Verifier verifier = new Verifier(cfs, sstable, false, Verifier.options().checkVersion(false).build())) |
| { |
| verifier.verify(); |
| } |
| catch (Throwable e) |
| { |
| fail("Verify should throw RuntimeException for old sstables "+sstable); |
| } |
| } |
| } |
| } |
| |
| @Test |
| public void testPendingAntiCompactionOldSSTables() throws Exception |
| { |
| for (String legacyVersion : legacyVersions) |
| { |
| ColumnFamilyStore cfs = Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_simple", legacyVersion)); |
| loadLegacyTable("legacy_%s_simple", legacyVersion); |
| |
| boolean shouldFail = !cfs.getLiveSSTables().stream().allMatch(sstable -> sstable.descriptor.version.hasPendingRepair()); |
| IPartitioner p = Iterables.getFirst(cfs.getLiveSSTables(), null).getPartitioner(); |
| Range<Token> r = new Range<>(p.getMinimumToken(), p.getMinimumToken()); |
| PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, singleton(r), nextTimeUUID(), 0, 0); |
| PendingAntiCompaction.AcquireResult res = acquisitionCallable.call(); |
| assertEquals(shouldFail, res == null); |
| if (res != null) |
| res.abort(); |
| } |
| } |
| |
| @Test |
| public void testAutomaticUpgrade() throws Exception |
| { |
| for (String legacyVersion : legacyVersions) |
| { |
| logger.info("Loading legacy version: {}", legacyVersion); |
| truncateLegacyTables(legacyVersion); |
| loadLegacyTables(legacyVersion); |
| ColumnFamilyStore cfs = Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_simple", legacyVersion)); |
| AbstractCompactionTask act = cfs.getCompactionStrategyManager().getNextBackgroundTask(0); |
| // there should be no compactions to run with auto upgrades disabled: |
| assertEquals(null, act); |
| } |
| |
| DatabaseDescriptor.setAutomaticSSTableUpgradeEnabled(true); |
| for (String legacyVersion : legacyVersions) |
| { |
| logger.info("Loading legacy version: {}", legacyVersion); |
| truncateLegacyTables(legacyVersion); |
| loadLegacyTables(legacyVersion); |
| ColumnFamilyStore cfs = Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_simple", legacyVersion)); |
| if (cfs.getLiveSSTables().stream().anyMatch(s -> !s.descriptor.version.isLatestVersion())) |
| assertTrue(cfs.metric.oldVersionSSTableCount.getValue() > 0); |
| while (cfs.getLiveSSTables().stream().anyMatch(s -> !s.descriptor.version.isLatestVersion())) |
| { |
| CompactionManager.instance.submitBackground(cfs); |
| Thread.sleep(100); |
| } |
| assertTrue(cfs.metric.oldVersionSSTableCount.getValue() == 0); |
| } |
| DatabaseDescriptor.setAutomaticSSTableUpgradeEnabled(false); |
| } |
| |
| private void streamLegacyTables(String legacyVersion) throws Exception |
| { |
| logger.info("Streaming legacy version {}", legacyVersion); |
| streamLegacyTable("legacy_%s_simple", legacyVersion); |
| streamLegacyTable("legacy_%s_simple_counter", legacyVersion); |
| streamLegacyTable("legacy_%s_clust", legacyVersion); |
| streamLegacyTable("legacy_%s_clust_counter", legacyVersion); |
| } |
| |
| private void streamLegacyTable(String tablePattern, String legacyVersion) throws Exception |
| { |
| String table = String.format(tablePattern, legacyVersion); |
| SSTableReader sstable = SSTableReader.open(getDescriptor(legacyVersion, table)); |
| IPartitioner p = sstable.getPartitioner(); |
| List<Range<Token>> ranges = new ArrayList<>(); |
| ranges.add(new Range<>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("100")))); |
| ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("100")), p.getMinimumToken())); |
| List<OutgoingStream> streams = Lists.newArrayList(new CassandraOutgoingFile(StreamOperation.OTHER, |
| sstable.ref(), |
| sstable.getPositionsForRanges(ranges), |
| ranges, |
| sstable.estimatedKeysForRanges(ranges))); |
| new StreamPlan(StreamOperation.OTHER).transferStreams(FBUtilities.getBroadcastAddressAndPort(), streams).execute().get(); |
| } |
| |
| public static void truncateLegacyTables(String legacyVersion) throws Exception |
| { |
| logger.info("Truncating legacy version {}", legacyVersion); |
| Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_simple", legacyVersion)).truncateBlocking(); |
| Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_simple_counter", legacyVersion)).truncateBlocking(); |
| Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_clust", legacyVersion)).truncateBlocking(); |
| Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_clust_counter", legacyVersion)).truncateBlocking(); |
| } |
| |
| private static void compactLegacyTables(String legacyVersion) throws Exception |
| { |
| logger.info("Compacting legacy version {}", legacyVersion); |
| Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_simple", legacyVersion)).forceMajorCompaction(); |
| Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_simple_counter", legacyVersion)).forceMajorCompaction(); |
| Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_clust", legacyVersion)).forceMajorCompaction(); |
| Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_clust_counter", legacyVersion)).forceMajorCompaction(); |
| } |
| |
| public static void loadLegacyTables(String legacyVersion) throws Exception |
| { |
| logger.info("Preparing legacy version {}", legacyVersion); |
| loadLegacyTable("legacy_%s_simple", legacyVersion); |
| loadLegacyTable("legacy_%s_simple_counter", legacyVersion); |
| loadLegacyTable("legacy_%s_clust", legacyVersion); |
| loadLegacyTable("legacy_%s_clust_counter", legacyVersion); |
| } |
| |
| private static void verifyCache(String legacyVersion, long startCount) throws InterruptedException, java.util.concurrent.ExecutionException |
| { |
| //For https://issues.apache.org/jira/browse/CASSANDRA-10778 |
| //Validate whether the key cache successfully saves in the presence of old keys as |
| //well as loads the correct number of keys |
| long endCount = CacheService.instance.keyCache.size(); |
| Assert.assertTrue(endCount > startCount); |
| CacheService.instance.keyCache.submitWrite(Integer.MAX_VALUE).get(); |
| CacheService.instance.invalidateKeyCache(); |
| Assert.assertEquals(startCount, CacheService.instance.keyCache.size()); |
| CacheService.instance.keyCache.loadSaved(); |
| Assert.assertEquals(endCount, CacheService.instance.keyCache.size()); |
| } |
| |
| private static void verifyReads(String legacyVersion) |
| { |
| for (int ck = 0; ck < 50; ck++) |
| { |
| String ckValue = Integer.toString(ck) + longString; |
| for (int pk = 0; pk < 5; pk++) |
| { |
| logger.debug("for pk={} ck={}", pk, ck); |
| |
| String pkValue = Integer.toString(pk); |
| if (ck == 0) |
| { |
| readSimpleTable(legacyVersion, pkValue); |
| readSimpleCounterTable(legacyVersion, pkValue); |
| } |
| |
| readClusteringTable(legacyVersion, ck, ckValue, pkValue); |
| readClusteringCounterTable(legacyVersion, ckValue, pkValue); |
| } |
| } |
| } |
| |
| private static void readClusteringCounterTable(String legacyVersion, String ckValue, String pkValue) |
| { |
| logger.debug("Read legacy_{}_clust_counter", legacyVersion); |
| UntypedResultSet rs; |
| rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_clust_counter WHERE pk=? AND ck=?", legacyVersion), pkValue, ckValue); |
| Assert.assertNotNull(rs); |
| Assert.assertEquals(1, rs.size()); |
| Assert.assertEquals(1L, rs.one().getLong("val")); |
| } |
| |
| private static void readClusteringTable(String legacyVersion, int ck, String ckValue, String pkValue) |
| { |
| logger.debug("Read legacy_{}_clust", legacyVersion); |
| UntypedResultSet rs; |
| rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_clust WHERE pk=? AND ck=?", legacyVersion), pkValue, ckValue); |
| assertLegacyClustRows(1, rs); |
| |
| String ckValue2 = Integer.toString(ck < 10 ? 40 : ck - 1) + longString; |
| String ckValue3 = Integer.toString(ck > 39 ? 10 : ck + 1) + longString; |
| rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_clust WHERE pk=? AND ck IN (?, ?, ?)", legacyVersion), pkValue, ckValue, ckValue2, ckValue3); |
| assertLegacyClustRows(3, rs); |
| } |
| |
| private static void readSimpleCounterTable(String legacyVersion, String pkValue) |
| { |
| logger.debug("Read legacy_{}_simple_counter", legacyVersion); |
| UntypedResultSet rs; |
| rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_simple_counter WHERE pk=?", legacyVersion), pkValue); |
| Assert.assertNotNull(rs); |
| Assert.assertEquals(1, rs.size()); |
| Assert.assertEquals(1L, rs.one().getLong("val")); |
| } |
| |
| private static void readSimpleTable(String legacyVersion, String pkValue) |
| { |
| logger.debug("Read simple: legacy_{}_simple", legacyVersion); |
| UntypedResultSet rs; |
| rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_simple WHERE pk=?", legacyVersion), pkValue); |
| Assert.assertNotNull(rs); |
| Assert.assertEquals(1, rs.size()); |
| Assert.assertEquals("foo bar baz", rs.one().getString("val")); |
| } |
| |
| private static void createKeyspace() |
| { |
| QueryProcessor.executeInternal("CREATE KEYSPACE legacy_tables WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}"); |
| } |
| |
| private static void createTables(String legacyVersion) |
| { |
| QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_simple (pk text PRIMARY KEY, val text)", legacyVersion)); |
| QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_simple_counter (pk text PRIMARY KEY, val counter)", legacyVersion)); |
| QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_clust (pk text, ck text, val text, PRIMARY KEY (pk, ck))", legacyVersion)); |
| QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_clust_counter (pk text, ck text, val counter, PRIMARY KEY (pk, ck))", legacyVersion)); |
| } |
| |
| private static void truncateTables(String legacyVersion) |
| { |
| QueryProcessor.executeInternal(String.format("TRUNCATE legacy_tables.legacy_%s_simple", legacyVersion)); |
| QueryProcessor.executeInternal(String.format("TRUNCATE legacy_tables.legacy_%s_simple_counter", legacyVersion)); |
| QueryProcessor.executeInternal(String.format("TRUNCATE legacy_tables.legacy_%s_clust", legacyVersion)); |
| QueryProcessor.executeInternal(String.format("TRUNCATE legacy_tables.legacy_%s_clust_counter", legacyVersion)); |
| CacheService.instance.invalidateCounterCache(); |
| CacheService.instance.invalidateKeyCache(); |
| } |
| |
| private static void assertLegacyClustRows(int count, UntypedResultSet rs) |
| { |
| Assert.assertNotNull(rs); |
| Assert.assertEquals(count, rs.size()); |
| for (int i = 0; i < count; i++) |
| { |
| for (UntypedResultSet.Row r : rs) |
| { |
| Assert.assertEquals(128, r.getString("val").length()); |
| } |
| } |
| } |
| |
| private static void loadLegacyTable(String tablePattern, String legacyVersion) throws IOException |
| { |
| String table = String.format(tablePattern, legacyVersion); |
| |
| logger.info("Loading legacy table {}", table); |
| |
| ColumnFamilyStore cfs = Keyspace.open("legacy_tables").getColumnFamilyStore(table); |
| |
| for (File cfDir : cfs.getDirectories().getCFDirectories()) |
| { |
| copySstablesToTestData(legacyVersion, table, cfDir); |
| } |
| |
| cfs.loadNewSSTables(); |
| } |
| |
| /** |
| * Generates sstables for 8 CQL tables (see {@link #createTables(String)}) in <i>current</i> |
| * sstable format (version) into {@code test/data/legacy-sstables/VERSION}, where |
| * {@code VERSION} matches {@link Version#getVersion() BigFormat.latestVersion.getVersion()}. |
| * <p> |
| * Run this test alone (e.g. from your IDE) when a new version is introduced or format changed |
| * during development. I.e. remove the {@code @Ignore} annotation temporarily. |
| * </p> |
| */ |
| @Ignore |
| @Test |
| public void testGenerateSstables() throws Throwable |
| { |
| Random rand = new Random(); |
| StringBuilder sb = new StringBuilder(); |
| for (int i = 0; i < 128; i++) |
| { |
| sb.append((char)('a' + rand.nextInt(26))); |
| } |
| String randomString = sb.toString(); |
| |
| for (int pk = 0; pk < 5; pk++) |
| { |
| String valPk = Integer.toString(pk); |
| QueryProcessor.executeInternal(String.format("INSERT INTO legacy_tables.legacy_%s_simple (pk, val) VALUES ('%s', '%s')", |
| BigFormat.latestVersion, valPk, "foo bar baz")); |
| |
| QueryProcessor.executeInternal(String.format("UPDATE legacy_tables.legacy_%s_simple_counter SET val = val + 1 WHERE pk = '%s'", |
| BigFormat.latestVersion, valPk)); |
| |
| for (int ck = 0; ck < 50; ck++) |
| { |
| String valCk = Integer.toString(ck); |
| |
| QueryProcessor.executeInternal(String.format("INSERT INTO legacy_tables.legacy_%s_clust (pk, ck, val) VALUES ('%s', '%s', '%s')", |
| BigFormat.latestVersion, valPk, valCk + longString, randomString)); |
| |
| QueryProcessor.executeInternal(String.format("UPDATE legacy_tables.legacy_%s_clust_counter SET val = val + 1 WHERE pk = '%s' AND ck='%s'", |
| BigFormat.latestVersion, valPk, valCk + longString)); |
| } |
| } |
| |
| StorageService.instance.forceKeyspaceFlush("legacy_tables", ColumnFamilyStore.FlushReason.UNIT_TESTS); |
| |
| File ksDir = new File(LEGACY_SSTABLE_ROOT, String.format("%s/legacy_tables", BigFormat.latestVersion)); |
| ksDir.tryCreateDirectories(); |
| copySstablesFromTestData(String.format("legacy_%s_simple", BigFormat.latestVersion), ksDir); |
| copySstablesFromTestData(String.format("legacy_%s_simple_counter", BigFormat.latestVersion), ksDir); |
| copySstablesFromTestData(String.format("legacy_%s_clust", BigFormat.latestVersion), ksDir); |
| copySstablesFromTestData(String.format("legacy_%s_clust_counter", BigFormat.latestVersion), ksDir); |
| } |
| |
| public static void copySstablesFromTestData(String table, File ksDir) throws IOException |
| { |
| File cfDir = new File(ksDir, table); |
| cfDir.tryCreateDirectory(); |
| |
| for (File srcDir : Keyspace.open("legacy_tables").getColumnFamilyStore(table).getDirectories().getCFDirectories()) |
| { |
| for (File file : srcDir.tryList()) |
| { |
| copyFile(cfDir, file); |
| } |
| } |
| } |
| |
| private static void copySstablesToTestData(String legacyVersion, String table, File cfDir) throws IOException |
| { |
| File tableDir = getTableDir(legacyVersion, table); |
| Assert.assertTrue("The table directory " + tableDir + " was not found", tableDir.isDirectory()); |
| for (File file : tableDir.tryList()) |
| { |
| copyFile(cfDir, file); |
| } |
| } |
| |
| private static File getTableDir(String legacyVersion, String table) |
| { |
| return new File(LEGACY_SSTABLE_ROOT, String.format("%s/legacy_tables/%s", legacyVersion, table)); |
| } |
| |
| private static void copyFile(File cfDir, File file) throws IOException |
| { |
| byte[] buf = new byte[65536]; |
| if (file.isFile()) |
| { |
| File target = new File(cfDir, file.name()); |
| int rd; |
| try (FileInputStreamPlus is = new FileInputStreamPlus(file); |
| FileOutputStreamPlus os = new FileOutputStreamPlus(target);) { |
| while ((rd = is.read(buf)) >= 0) |
| os.write(buf, 0, rd); |
| } |
| } |
| } |
| } |