| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.cassandra.db; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.RandomAccessFile; |
| import java.nio.file.Files; |
| import java.nio.file.Path; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Set; |
| |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Sets; |
| import org.apache.commons.lang3.StringUtils; |
| import org.junit.Assert; |
| import org.junit.Test; |
| |
| import org.apache.cassandra.cache.RowCacheKey; |
| import org.apache.cassandra.cql3.CQLTester; |
| import org.apache.cassandra.cql3.UntypedResultSet; |
| import org.apache.cassandra.db.lifecycle.LifecycleTransaction; |
| import org.apache.cassandra.dht.BootStrapper; |
| import org.apache.cassandra.io.sstable.Component; |
| import org.apache.cassandra.io.sstable.format.SSTableReader; |
| import org.apache.cassandra.locator.InetAddressAndPort; |
| import org.apache.cassandra.locator.TokenMetadata; |
| import org.apache.cassandra.service.CacheService; |
| import org.apache.cassandra.service.StorageService; |
| import org.apache.cassandra.utils.FBUtilities; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| |
| public class ImportTest extends CQLTester |
| { |
| |
| @Test |
| public void basicImportByMovingTest() throws Throwable |
| { |
| File backupDir = prepareBasicImporting(); |
| // copy is false - so importing will be done by moving |
| importSSTables(SSTableImporter.Options.options(backupDir.toString()).copyData(false).build(), 10); |
| // files were moved |
| Assert.assertEquals(0, countFiles(backupDir)); |
| } |
| |
| @Test |
| public void basicImportByCopyingTest() throws Throwable |
| { |
| File backupDir = prepareBasicImporting(); |
| // copy is true - so importing will be done by copying |
| importSSTables(SSTableImporter.Options.options(backupDir.toString()).copyData(true).build(), 10); |
| // files are left there as they were just copied |
| Assert.assertNotEquals(0, countFiles(backupDir)); |
| } |
| |
| private File prepareBasicImporting() throws Throwable |
| { |
| createTable("create table %s (id int primary key, d int)"); |
| |
| |
| for (int i = 0; i < 10; i++) |
| { |
| execute("insert into %s (id, d) values (?, ?)", i, i); |
| } |
| |
| getCurrentColumnFamilyStore().forceBlockingFlush(); |
| |
| Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables(); |
| getCurrentColumnFamilyStore().clearUnsafe(); |
| |
| File backupdir = moveToBackupDir(sstables); |
| |
| assertEquals(0, execute("select * from %s").size()); |
| |
| return backupdir; |
| } |
| |
| private List<String> importSSTables(SSTableImporter.Options options, int expectedRows) throws Throwable { |
| SSTableImporter importer = new SSTableImporter(getCurrentColumnFamilyStore()); |
| List<String> failedDirectories = importer.importNewSSTables(options); |
| assertEquals(expectedRows, execute("select * from %s").size()); |
| return failedDirectories; |
| } |
| |
| @Test |
| public void basicImportMultiDirTest() throws Throwable |
| { |
| createTable("create table %s (id int primary key, d int)"); |
| for (int i = 0; i < 10; i++) |
| execute("insert into %s (id, d) values (?, ?)", i, i); |
| getCurrentColumnFamilyStore().forceBlockingFlush(); |
| Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables(); |
| getCurrentColumnFamilyStore().clearUnsafe(); |
| |
| File backupdir = moveToBackupDir(sstables); |
| for (int i = 10; i < 20; i++) |
| execute("insert into %s (id, d) values (?, ?)", i, i); |
| getCurrentColumnFamilyStore().forceBlockingFlush(); |
| sstables = getCurrentColumnFamilyStore().getLiveSSTables(); |
| getCurrentColumnFamilyStore().clearUnsafe(); |
| |
| File backupdir2 = moveToBackupDir(sstables); |
| |
| assertEquals(0, execute("select * from %s").size()); |
| |
| SSTableImporter.Options options = SSTableImporter.Options.options(Sets.newHashSet(backupdir.toString(), backupdir2.toString())).build(); |
| SSTableImporter importer = new SSTableImporter(getCurrentColumnFamilyStore()); |
| importer.importNewSSTables(options); |
| |
| assertEquals(20, execute("select * from %s").size()); |
| |
| } |
| |
| |
| @Test |
| @Deprecated |
| public void refreshTest() throws Throwable |
| { |
| createTable("create table %s (id int primary key, d int)"); |
| for (int i = 0; i < 10; i++) |
| execute("insert into %s (id, d) values (?, ?)", i, i); |
| getCurrentColumnFamilyStore().forceBlockingFlush(); |
| Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables(); |
| getCurrentColumnFamilyStore().clearUnsafe(); |
| sstables.forEach(s -> s.selfRef().release()); |
| assertEquals(0, execute("select * from %s").size()); |
| getCurrentColumnFamilyStore().loadNewSSTables(); |
| assertEquals(10, execute("select * from %s").size()); |
| } |
| |
| @Test |
| public void importResetLevelTest() throws Throwable |
| { |
| createTable("create table %s (id int primary key, d int)"); |
| for (int i = 0; i < 10; i++) |
| execute("insert into %s (id, d) values (?, ?)", i, i); |
| getCurrentColumnFamilyStore().forceBlockingFlush(); |
| Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables(); |
| getCurrentColumnFamilyStore().clearUnsafe(); |
| for (SSTableReader sstable : sstables) |
| sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 8); |
| File backupdir = moveToBackupDir(sstables); |
| assertEquals(0, execute("select * from %s").size()); |
| |
| SSTableImporter.Options options = SSTableImporter.Options.options(backupdir.toString()).build(); |
| SSTableImporter importer = new SSTableImporter(getCurrentColumnFamilyStore()); |
| importer.importNewSSTables(options); |
| |
| assertEquals(10, execute("select * from %s").size()); |
| sstables = getCurrentColumnFamilyStore().getLiveSSTables(); |
| assertEquals(1, sstables.size()); |
| for (SSTableReader sstable : sstables) |
| assertEquals(8, sstable.getSSTableLevel()); |
| |
| getCurrentColumnFamilyStore().clearUnsafe(); |
| backupdir = moveToBackupDir(sstables); |
| |
| options = SSTableImporter.Options.options(backupdir.toString()).resetLevel(true).build(); |
| importer.importNewSSTables(options); |
| |
| sstables = getCurrentColumnFamilyStore().getLiveSSTables(); |
| assertEquals(1, sstables.size()); |
| for (SSTableReader sstable : getCurrentColumnFamilyStore().getLiveSSTables()) |
| assertEquals(0, sstable.getSSTableLevel()); |
| } |
| |
| |
| @Test |
| public void importClearRepairedTest() throws Throwable |
| { |
| createTable("create table %s (id int primary key, d int)"); |
| for (int i = 0; i < 10; i++) |
| execute("insert into %s (id, d) values (?, ?)", i, i); |
| getCurrentColumnFamilyStore().forceBlockingFlush(); |
| Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables(); |
| getCurrentColumnFamilyStore().clearUnsafe(); |
| for (SSTableReader sstable : sstables) |
| sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, 111, null, false); |
| |
| File backupdir = moveToBackupDir(sstables); |
| assertEquals(0, execute("select * from %s").size()); |
| |
| SSTableImporter.Options options = SSTableImporter.Options.options(backupdir.toString()).build(); |
| SSTableImporter importer = new SSTableImporter(getCurrentColumnFamilyStore()); |
| importer.importNewSSTables(options); |
| |
| assertEquals(10, execute("select * from %s").size()); |
| sstables = getCurrentColumnFamilyStore().getLiveSSTables(); |
| assertEquals(1, sstables.size()); |
| for (SSTableReader sstable : sstables) |
| assertTrue(sstable.isRepaired()); |
| |
| getCurrentColumnFamilyStore().clearUnsafe(); |
| backupdir = moveToBackupDir(sstables); |
| |
| options = SSTableImporter.Options.options(backupdir.toString()).clearRepaired(true).build(); |
| importer.importNewSSTables(options); |
| sstables = getCurrentColumnFamilyStore().getLiveSSTables(); |
| assertEquals(1, sstables.size()); |
| for (SSTableReader sstable : getCurrentColumnFamilyStore().getLiveSSTables()) |
| assertFalse(sstable.isRepaired()); |
| } |
| |
| private File moveToBackupDir(Set<SSTableReader> sstables) throws IOException |
| { |
| Path temp = Files.createTempDirectory("importtest"); |
| SSTableReader sst = sstables.iterator().next(); |
| String tabledir = sst.descriptor.directory.getName(); |
| String ksdir = sst.descriptor.directory.getParentFile().getName(); |
| Path backupdir = createDirectories(temp.toString(), ksdir, tabledir); |
| for (SSTableReader sstable : sstables) |
| { |
| sstable.selfRef().release(); |
| for (File f : sstable.descriptor.directory.listFiles()) |
| { |
| if (f.toString().contains(sstable.descriptor.baseFilename())) |
| { |
| System.out.println("move " + f.toPath() + " to " + backupdir); |
| File moveFileTo = new File(backupdir.toFile(), f.getName()); |
| moveFileTo.deleteOnExit(); |
| Files.move(f.toPath(), moveFileTo.toPath()); |
| } |
| } |
| } |
| return backupdir.toFile(); |
| } |
| |
| private Path createDirectories(String base, String ... subdirs) |
| { |
| File b = new File(base); |
| b.mkdir(); |
| System.out.println("mkdir "+b); |
| b.deleteOnExit(); |
| for (String subdir : subdirs) |
| { |
| b = new File(b, subdir); |
| b.mkdir(); |
| System.out.println("mkdir "+b); |
| b.deleteOnExit(); |
| } |
| return b.toPath(); |
| } |
| |
| @Test |
| public void testGetCorrectDirectory() throws Throwable |
| { |
| TokenMetadata metadata = StorageService.instance.getTokenMetadata(); |
| metadata.updateNormalTokens(BootStrapper.getRandomTokens(metadata, 10), FBUtilities.getBroadcastAddressAndPort()); |
| createTable("create table %s (id int primary key, d int)"); |
| getCurrentColumnFamilyStore().disableAutoCompaction(); |
| |
| // generate sstables with different first tokens |
| for (int i = 0; i < 10; i++) |
| { |
| execute("insert into %s (id, d) values (?, ?)", i, i); |
| getCurrentColumnFamilyStore().forceBlockingFlush(); |
| } |
| |
| Set<SSTableReader> toMove = getCurrentColumnFamilyStore().getLiveSSTables(); |
| getCurrentColumnFamilyStore().clearUnsafe(); |
| File dir = moveToBackupDir(toMove); |
| |
| Directories dirs = new Directories(getCurrentColumnFamilyStore().metadata(), Lists.newArrayList(new Directories.DataDirectory(new File("/tmp/1")), |
| new Directories.DataDirectory(new File("/tmp/2")), |
| new Directories.DataDirectory(new File("/tmp/3")))); |
| MockCFS mock = new MockCFS(getCurrentColumnFamilyStore(), dirs); |
| SSTableImporter importer = new SSTableImporter(mock); |
| |
| importer.importNewSSTables(SSTableImporter.Options.options(dir.toString()).build()); |
| for (SSTableReader sstable : mock.getLiveSSTables()) |
| { |
| File movedDir = sstable.descriptor.directory.getCanonicalFile(); |
| File correctDir = mock.getDiskBoundaries().getCorrectDiskForSSTable(sstable).location.getCanonicalFile(); |
| assertTrue(movedDir.toString().startsWith(correctDir.toString())); |
| } |
| for (SSTableReader sstable : mock.getLiveSSTables()) |
| sstable.selfRef().release(); |
| } |
| |
| private void testCorruptHelper(boolean verify, boolean copy) throws Throwable |
| { |
| createTable("create table %s (id int primary key, d int)"); |
| for (int i = 0; i < 10; i++) |
| execute("insert into %s (id, d) values (?, ?)", i, i); |
| getCurrentColumnFamilyStore().forceBlockingFlush(); |
| SSTableReader sstableToCorrupt = getCurrentColumnFamilyStore().getLiveSSTables().iterator().next(); |
| for (int i = 0; i < 10; i++) |
| execute("insert into %s (id, d) values (?, ?)", i + 10, i); |
| getCurrentColumnFamilyStore().forceBlockingFlush(); |
| Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables(); |
| |
| getCurrentColumnFamilyStore().clearUnsafe(); |
| |
| String filenameToCorrupt = sstableToCorrupt.descriptor.filenameFor(Component.STATS); |
| try (RandomAccessFile file = new RandomAccessFile(filenameToCorrupt, "rw")) |
| { |
| file.seek(0); |
| file.writeBytes(StringUtils.repeat('z', 2)); |
| } |
| |
| File backupdir = moveToBackupDir(sstables); |
| |
| // now move a correct sstable to another directory to make sure that directory gets properly imported |
| for (int i = 100; i < 130; i++) |
| execute("insert into %s (id, d) values (?, ?)", i, i); |
| getCurrentColumnFamilyStore().forceBlockingFlush(); |
| Set<SSTableReader> correctSSTables = getCurrentColumnFamilyStore().getLiveSSTables(); |
| |
| getCurrentColumnFamilyStore().clearUnsafe(); |
| File backupdirCorrect = moveToBackupDir(correctSSTables); |
| |
| Set<File> beforeImport = Sets.newHashSet(backupdir.listFiles()); |
| // first we moved out 2 sstables, one correct and one corrupt in to a single directory (backupdir) |
| // then we moved out 1 sstable, a correct one (in backupdirCorrect). |
| // now import should fail import on backupdir, but import the one in backupdirCorrect. |
| SSTableImporter.Options options = SSTableImporter.Options.options(Sets.newHashSet(backupdir.toString(), backupdirCorrect.toString())).copyData(copy).verifySSTables(verify).build(); |
| SSTableImporter importer = new SSTableImporter(getCurrentColumnFamilyStore()); |
| List<String> failedDirectories = importer.importNewSSTables(options); |
| assertEquals(Collections.singletonList(backupdir.toString()), failedDirectories); |
| UntypedResultSet res = execute("SELECT * FROM %s"); |
| for (UntypedResultSet.Row r : res) |
| { |
| int pk = r.getInt("id"); |
| assertTrue("pk = "+pk, pk >= 100 && pk < 130); |
| } |
| assertEquals("Data dir should contain one file", 1, countFiles(getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables())); |
| assertEquals("backupdir contained 2 files before import, should still contain 2 after failing to import it", beforeImport, Sets.newHashSet(backupdir.listFiles())); |
| if (copy) |
| { |
| assertEquals("backupdirCorrect contained 1 file before import, should contain 1 after import too", 1, countFiles(backupdirCorrect)); |
| } |
| else |
| { |
| assertEquals("backupdirCorrect contained 1 file before import, should be empty after import", 0, countFiles(backupdirCorrect)); |
| } |
| |
| } |
| |
| private int countFiles(File dir) |
| { |
| int fileCount = 0; |
| |
| for (File f : dir.listFiles()) |
| { |
| if (f.isFile() && f.toString().contains("-Data.db")) |
| { |
| fileCount++; |
| } |
| } |
| return fileCount; |
| } |
| |
| @Test |
| public void testImportCorrupt() throws Throwable |
| { |
| testCorruptHelper(true, false); |
| } |
| |
| @Test |
| public void testImportCorruptWithCopying() throws Throwable |
| { |
| testCorruptHelper(true, true); |
| } |
| |
| @Test |
| public void testImportCorruptWithoutValidation() throws Throwable |
| { |
| testCorruptHelper(false, false); |
| } |
| |
| @Test |
| public void testImportCorruptWithoutValidationWithCopying() throws Throwable |
| { |
| testCorruptHelper(false, true); |
| } |
| |
| @Test |
| public void testImportOutOfRange() throws Throwable |
| { |
| createTable("create table %s (id int primary key, d int)"); |
| for (int i = 0; i < 1000; i++) |
| execute("insert into %s (id, d) values (?, ?)", i, i); |
| getCurrentColumnFamilyStore().forceBlockingFlush(); |
| Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables(); |
| |
| getCurrentColumnFamilyStore().clearUnsafe(); |
| |
| TokenMetadata tmd = StorageService.instance.getTokenMetadata(); |
| |
| tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 5), InetAddressAndPort.getByName("127.0.0.1")); |
| tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 5), InetAddressAndPort.getByName("127.0.0.2")); |
| tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 5), InetAddressAndPort.getByName("127.0.0.3")); |
| |
| |
| File backupdir = moveToBackupDir(sstables); |
| try |
| { |
| SSTableImporter.Options options = SSTableImporter.Options.options(backupdir.toString()).verifySSTables(true).verifyTokens(true).build(); |
| SSTableImporter importer = new SSTableImporter(getCurrentColumnFamilyStore()); |
| List<String> failed = importer.importNewSSTables(options); |
| assertEquals(Collections.singletonList(backupdir.toString()), failed); |
| |
| // verify that we check the tokens if verifySSTables == false but verifyTokens == true: |
| options = SSTableImporter.Options.options(backupdir.toString()).verifySSTables(false).verifyTokens(true).build(); |
| importer = new SSTableImporter(getCurrentColumnFamilyStore()); |
| failed = importer.importNewSSTables(options); |
| assertEquals(Collections.singletonList(backupdir.toString()), failed); |
| |
| // and that we can import with it disabled: |
| options = SSTableImporter.Options.options(backupdir.toString()).verifySSTables(true).verifyTokens(false).build(); |
| importer = new SSTableImporter(getCurrentColumnFamilyStore()); |
| failed = importer.importNewSSTables(options); |
| assertTrue(failed.isEmpty()); |
| |
| } |
| finally |
| { |
| tmd.clearUnsafe(); |
| } |
| } |
| |
| @Test |
| public void testImportOutOfRangeExtendedVerify() throws Throwable |
| { |
| createTable("create table %s (id int primary key, d int)"); |
| for (int i = 0; i < 1000; i++) |
| execute("insert into %s (id, d) values (?, ?)", i, i); |
| getCurrentColumnFamilyStore().forceBlockingFlush(); |
| Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables(); |
| |
| getCurrentColumnFamilyStore().clearUnsafe(); |
| |
| TokenMetadata tmd = StorageService.instance.getTokenMetadata(); |
| |
| tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 5), InetAddressAndPort.getByName("127.0.0.1")); |
| tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 5), InetAddressAndPort.getByName("127.0.0.2")); |
| tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 5), InetAddressAndPort.getByName("127.0.0.3")); |
| |
| |
| File backupdir = moveToBackupDir(sstables); |
| try |
| { |
| SSTableImporter.Options options = SSTableImporter.Options.options(backupdir.toString()) |
| .verifySSTables(true) |
| .verifyTokens(true) |
| .extendedVerify(true).build(); |
| SSTableImporter importer = new SSTableImporter(getCurrentColumnFamilyStore()); |
| List<String> failedDirectories = importer.importNewSSTables(options); |
| assertEquals(Collections.singletonList(backupdir.toString()), failedDirectories); |
| } |
| finally |
| { |
| tmd.clearUnsafe(); |
| } |
| } |
| |
| |
| @Test |
| public void testImportInvalidateCache() throws Throwable |
| { |
| createTable("create table %s (id int primary key, d int) WITH caching = { 'keys': 'NONE', 'rows_per_partition': 'ALL' }"); |
| for (int i = 0; i < 10; i++) |
| execute("insert into %s (id, d) values (?, ?)", i, i); |
| getCurrentColumnFamilyStore().forceBlockingFlush(); |
| CacheService.instance.setRowCacheCapacityInMB(1); |
| |
| Set<RowCacheKey> keysToInvalidate = new HashSet<>(); |
| |
| // populate the row cache with keys from the sstable we are about to remove |
| for (int i = 0; i < 10; i++) |
| { |
| execute("SELECT * FROM %s WHERE id = ?", i); |
| } |
| Iterator<RowCacheKey> it = CacheService.instance.rowCache.keyIterator(); |
| while (it.hasNext()) |
| { |
| keysToInvalidate.add(it.next()); |
| } |
| SSTableReader sstableToImport = getCurrentColumnFamilyStore().getLiveSSTables().iterator().next(); |
| getCurrentColumnFamilyStore().clearUnsafe(); |
| |
| |
| for (int i = 10; i < 20; i++) |
| execute("insert into %s (id, d) values (?, ?)", i, i); |
| getCurrentColumnFamilyStore().forceBlockingFlush(); |
| |
| Set<RowCacheKey> allCachedKeys = new HashSet<>(); |
| |
| // populate row cache with sstable we are keeping |
| for (int i = 10; i < 20; i++) |
| { |
| execute("SELECT * FROM %s WHERE id = ?", i); |
| } |
| it = CacheService.instance.rowCache.keyIterator(); |
| while (it.hasNext()) |
| { |
| allCachedKeys.add(it.next()); |
| } |
| assertEquals(20, CacheService.instance.rowCache.size()); |
| File backupdir = moveToBackupDir(Collections.singleton(sstableToImport)); |
| // make sure we don't wipe caches with invalidateCaches = false: |
| Set<SSTableReader> beforeFirstImport = getCurrentColumnFamilyStore().getLiveSSTables(); |
| |
| SSTableImporter.Options options = SSTableImporter.Options.options(backupdir.toString()).verifySSTables(true).verifyTokens(true).build(); |
| SSTableImporter importer = new SSTableImporter(getCurrentColumnFamilyStore()); |
| importer.importNewSSTables(options); |
| assertEquals(20, CacheService.instance.rowCache.size()); |
| Set<SSTableReader> toMove = Sets.difference(getCurrentColumnFamilyStore().getLiveSSTables(), beforeFirstImport); |
| getCurrentColumnFamilyStore().clearUnsafe(); |
| // move away the sstable we just imported again: |
| backupdir = moveToBackupDir(toMove); |
| beforeFirstImport.forEach(s -> s.selfRef().release()); |
| options = SSTableImporter.Options.options(backupdir.toString()).verifySSTables(true).verifyTokens(true).invalidateCaches(true).build(); |
| importer.importNewSSTables(options); |
| assertEquals(10, CacheService.instance.rowCache.size()); |
| it = CacheService.instance.rowCache.keyIterator(); |
| while (it.hasNext()) |
| { |
| // make sure the keys from the sstable we are importing are invalidated and that the other one is still there |
| RowCacheKey rck = it.next(); |
| assertTrue(allCachedKeys.contains(rck)); |
| assertFalse(keysToInvalidate.contains(rck)); |
| } |
| } |
| |
| @Test |
| public void testImportCacheEnabledWithoutSrcDir() throws Throwable |
| { |
| createTable("create table %s (id int primary key, d int) WITH caching = { 'keys': 'NONE', 'rows_per_partition': 'ALL' }"); |
| for (int i = 0; i < 10; i++) |
| execute("insert into %s (id, d) values (?, ?)", i, i); |
| getCurrentColumnFamilyStore().forceBlockingFlush(); |
| Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables(); |
| CacheService.instance.setRowCacheCapacityInMB(1); |
| getCurrentColumnFamilyStore().clearUnsafe(); |
| sstables.forEach(s -> s.selfRef().release()); |
| SSTableImporter.Options options = SSTableImporter.Options.options().invalidateCaches(true).build(); |
| SSTableImporter importer = new SSTableImporter(getCurrentColumnFamilyStore()); |
| importer.importNewSSTables(options); |
| assertEquals(1, getCurrentColumnFamilyStore().getLiveSSTables().size()); |
| } |
| |
| @Test |
| public void testRefreshCorrupt() throws Throwable |
| { |
| createTable("create table %s (id int primary key, d int) WITH caching = { 'keys': 'NONE', 'rows_per_partition': 'ALL' }"); |
| for (int i = 0; i < 10; i++) |
| execute("insert into %s (id, d) values (?, ?)", i, i); |
| getCurrentColumnFamilyStore().forceBlockingFlush(); |
| Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables(); |
| getCurrentColumnFamilyStore().clearUnsafe(); |
| sstables.forEach(s -> s.selfRef().release()); |
| // corrupt the sstable which is still in the data directory |
| SSTableReader sstableToCorrupt = sstables.iterator().next(); |
| String filenameToCorrupt = sstableToCorrupt.descriptor.filenameFor(Component.STATS); |
| try (RandomAccessFile file = new RandomAccessFile(filenameToCorrupt, "rw")) |
| { |
| file.seek(0); |
| file.writeBytes(StringUtils.repeat('z', 2)); |
| } |
| |
| for (int i = 10; i < 20; i++) |
| execute("insert into %s (id, d) values (?, ?)", i, i); |
| getCurrentColumnFamilyStore().forceBlockingFlush(); |
| for (int i = 20; i < 30; i++) |
| execute("insert into %s (id, d) values (?, ?)", i, i); |
| getCurrentColumnFamilyStore().forceBlockingFlush(); |
| |
| Set<SSTableReader> expectedFiles = new HashSet<>(getCurrentColumnFamilyStore().getLiveSSTables()); |
| |
| SSTableImporter.Options options = SSTableImporter.Options.options().build(); |
| SSTableImporter importer = new SSTableImporter(getCurrentColumnFamilyStore()); |
| boolean gotException = false; |
| try |
| { |
| importer.importNewSSTables(options); |
| } |
| catch (Throwable t) |
| { |
| gotException = true; |
| } |
| assertTrue(gotException); |
| assertEquals(2, getCurrentColumnFamilyStore().getLiveSSTables().size()); |
| // for nodetool refresh we leave corrupt sstables in the data directory |
| assertEquals(3, countFiles(sstableToCorrupt.descriptor.directory)); |
| int rowCount = 0; |
| for (UntypedResultSet.Row r : execute("SELECT * FROM %s")) |
| { |
| rowCount++; |
| int pk = r.getInt("id"); |
| assertTrue("pk = "+pk, pk >= 10 && pk < 30); |
| } |
| assertEquals(20, rowCount); |
| assertEquals(expectedFiles, getCurrentColumnFamilyStore().getLiveSSTables()); |
| for (SSTableReader sstable : expectedFiles) |
| assertTrue(new File(sstable.descriptor.filenameFor(Component.DATA)).exists()); |
| getCurrentColumnFamilyStore().truncateBlocking(); |
| LifecycleTransaction.waitForDeletions(); |
| for (File f : sstableToCorrupt.descriptor.directory.listFiles()) // clean up the corrupt files which truncate does not handle |
| f.delete(); |
| |
| } |
| |
| /** |
| * If a user gives a bad directory we don't import any directories - we should let the user correct the directories |
| */ |
| @Test |
| public void importBadDirectoryTest() throws Throwable |
| { |
| createTable("create table %s (id int primary key, d int)"); |
| for (int i = 0; i < 10; i++) |
| execute("insert into %s (id, d) values (?, ?)", i, i); |
| getCurrentColumnFamilyStore().forceBlockingFlush(); |
| Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables(); |
| getCurrentColumnFamilyStore().clearUnsafe(); |
| |
| File backupdir = moveToBackupDir(sstables); |
| for (int i = 10; i < 20; i++) |
| execute("insert into %s (id, d) values (?, ?)", i, i); |
| getCurrentColumnFamilyStore().forceBlockingFlush(); |
| sstables = getCurrentColumnFamilyStore().getLiveSSTables(); |
| getCurrentColumnFamilyStore().clearUnsafe(); |
| |
| File backupdir2 = moveToBackupDir(sstables); |
| |
| assertEquals(0, execute("select * from %s").size()); |
| |
| SSTableImporter.Options options = SSTableImporter.Options.options(Sets.newHashSet(backupdir.toString(), backupdir2.toString(), "/tmp/DOESNTEXIST")).build(); |
| SSTableImporter importer = new SSTableImporter(getCurrentColumnFamilyStore()); |
| boolean gotException = false; |
| try |
| { |
| importer.importNewSSTables(options); |
| } |
| catch (Throwable t) |
| { |
| gotException = true; |
| } |
| assertTrue(gotException); |
| assertEquals(0, execute("select * from %s").size()); |
| assertEquals(0, getCurrentColumnFamilyStore().getLiveSSTables().size()); |
| } |
| |
| private static class MockCFS extends ColumnFamilyStore |
| { |
| public MockCFS(ColumnFamilyStore cfs, Directories dirs) |
| { |
| super(cfs.keyspace, cfs.getTableName(), 0, cfs.metadata, dirs, false, false, true); |
| } |
| } |
| } |