Merge branch 'cassandra-4.0' into cassandra-4.1
diff --git a/CHANGES.txt b/CHANGES.txt
index 17257d6..4c7d591 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,7 @@
  * Do not go to disk for reading hints file sizes (CASSANDRA-19477)
  * Fix system_views.settings to handle array types (CASSANDRA-19475)
 Merged from 4.0:
+ * Make nodetool import congruent with the documentation by not relying on the folder structure of the imported SSTable files (CASSANDRA-19401)
  * IR may leak SSTables with pending repair when coming from streaming (CASSANDRA-19182)
  * Streaming exception race creates corrupt transaction log files that prevent restart (CASSANDRA-18736)
  * Fix CQL tojson timestamp output on negative timestamp values before Gregorian calendar reform in 1582 (CASSANDRA-19566)
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index b16dd97..3b20d7a 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -904,7 +904,21 @@
 
         public Map<Descriptor, Set<Component>> list()
         {
-            filter();
+            return list(false);
+        }
+
+        /**
+         * This method is used upon SSTable importing (nodetool import) as there is no strict requirement to
+         * place SSTables in a directory structure where name of a directory equals to table name
+         * and parent of such directory equals to keyspace name. For such cases, we want to include such SSTables too,
+         * rendering the parameter to be set to true.
+         *
+         * @param includeForeignTables whether descriptors not matching metadata of this lister should be included
+         * @return found descriptors and related set of components
+         */
+        public Map<Descriptor, Set<Component>> list(boolean includeForeignTables)
+        {
+            filter(includeForeignTables);
             return ImmutableMap.copyOf(components);
         }
 
@@ -922,7 +936,12 @@
 
         public List<File> listFiles()
         {
-            filter();
+            return listFiles(false);
+        }
+
+        public List<File> listFiles(boolean includeForeignTables)
+        {
+            filter(includeForeignTables);
             List<File> l = new ArrayList<>(nbFiles);
             for (Map.Entry<Descriptor, Set<Component>> entry : components.entrySet())
             {
@@ -934,7 +953,7 @@
             return l;
         }
 
-        private void filter()
+        private void filter(boolean includeForeignTables)
         {
             if (filtered)
                 return;
@@ -946,21 +965,21 @@
 
                 if (snapshotName != null)
                 {
-                    LifecycleTransaction.getFiles(getSnapshotDirectory(location, snapshotName).toPath(), getFilter(), onTxnErr);
+                    LifecycleTransaction.getFiles(getSnapshotDirectory(location, snapshotName).toPath(), getFilter(includeForeignTables), onTxnErr);
                     continue;
                 }
 
                 if (!onlyBackups)
-                    LifecycleTransaction.getFiles(location.toPath(), getFilter(), onTxnErr);
+                    LifecycleTransaction.getFiles(location.toPath(), getFilter(includeForeignTables), onTxnErr);
 
                 if (includeBackups)
-                    LifecycleTransaction.getFiles(getBackupsDirectory(location).toPath(), getFilter(), onTxnErr);
+                    LifecycleTransaction.getFiles(getBackupsDirectory(location).toPath(), getFilter(includeForeignTables), onTxnErr);
             }
 
             filtered = true;
         }
 
-        private BiPredicate<File, FileType> getFilter()
+        private BiPredicate<File, FileType> getFilter(boolean includeForeignTables)
         {
             // This function always return false since it adds to the components map
             return (file, type) ->
@@ -978,15 +997,31 @@
                         if (pair == null)
                             return false;
 
+                        Descriptor descriptor = null;
+
                         // we are only interested in the SSTable files that belong to the specific ColumnFamily
                         if (!pair.left.ksname.equals(metadata.keyspace) || !pair.left.cfname.equals(metadata.name))
-                            return false;
+                        {
+                            if (!includeForeignTables)
+                                return false;
 
-                        Set<Component> previous = components.get(pair.left);
+                            descriptor = new Descriptor(pair.left.version,
+                                                        pair.left.directory,
+                                                        metadata.keyspace,
+                                                        metadata.name,
+                                                        pair.left.id,
+                                                        pair.left.formatType);
+                        }
+                        else
+                        {
+                            descriptor = pair.left;
+                        }
+
+                        Set<Component> previous = components.get(descriptor);
                         if (previous == null)
                         {
                             previous = new HashSet<>();
-                            components.put(pair.left, previous);
+                            components.put(descriptor, previous);
                         }
                         previous.add(pair.right);
                         nbFiles++;
diff --git a/src/java/org/apache/cassandra/db/SSTableImporter.java b/src/java/org/apache/cassandra/db/SSTableImporter.java
index 5949559..8a1e897 100644
--- a/src/java/org/apache/cassandra/db/SSTableImporter.java
+++ b/src/java/org/apache/cassandra/db/SSTableImporter.java
@@ -86,7 +86,7 @@
             {
                 Directories.SSTableLister lister = listerPair.left;
                 String dir = listerPair.right;
-                for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet())
+                for (Map.Entry<Descriptor, Set<Component>> entry : lister.list(true).entrySet())
                 {
                     Descriptor descriptor = entry.getKey();
                     if (!currentDescriptors.contains(entry.getKey()))
@@ -124,7 +124,7 @@
 
             Set<MovedSSTable> movedSSTables = new HashSet<>();
             Set<SSTableReader> newSSTablesPerDirectory = new HashSet<>();
-            for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet())
+            for (Map.Entry<Descriptor, Set<Component>> entry : lister.list(true).entrySet())
             {
                 try
                 {
diff --git a/test/unit/org/apache/cassandra/db/ImportTest.java b/test/unit/org/apache/cassandra/db/ImportTest.java
index 8244ff6..869e9e2 100644
--- a/test/unit/org/apache/cassandra/db/ImportTest.java
+++ b/test/unit/org/apache/cassandra/db/ImportTest.java
@@ -31,6 +31,7 @@
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.commons.lang3.StringUtils;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -56,6 +57,11 @@
 
 public class ImportTest extends CQLTester
 {
+    @After
+    public void afterTest()
+    {
+        SSTableReader.resetTidying();
+    }
 
     @Test
     public void basicImportByMovingTest() throws Throwable
@@ -224,13 +230,10 @@
             assertFalse(sstable.isRepaired());
     }
 
-    private File moveToBackupDir(Set<SSTableReader> sstables) throws IOException
+    private File moveToBackupDir(Set<SSTableReader> sstables, String keyspace, String table) throws IOException
     {
         Path temp = Files.createTempDirectory("importtest");
-        SSTableReader sst = sstables.iterator().next();
-        String tabledir = sst.descriptor.directory.name();
-        String ksdir = sst.descriptor.directory.parent().name();
-        Path backupdir = createDirectories(temp.toString(), ksdir, tabledir);
+        Path backupdir = createDirectories(temp.toString(), keyspace, table);
         for (SSTableReader sstable : sstables)
         {
             sstable.selfRef().release();
@@ -248,6 +251,14 @@
         return new File(backupdir);
     }
 
+    private File moveToBackupDir(Set<SSTableReader> sstables) throws IOException
+    {
+        SSTableReader sst = sstables.iterator().next();
+        String tabledir = sst.descriptor.directory.name();
+        String ksdir = sst.descriptor.directory.parent().name();
+        return moveToBackupDir(sstables, ksdir, tabledir);
+    }
+
     private Path createDirectories(String base, String ... subdirs)
     {
         File b = new File(base);
@@ -411,9 +422,9 @@
 
         TokenMetadata tmd = StorageService.instance.getTokenMetadata();
 
-        tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 5), InetAddressAndPort.getByName("127.0.0.1"));
-        tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 5), InetAddressAndPort.getByName("127.0.0.2"));
-        tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 5), InetAddressAndPort.getByName("127.0.0.3"));
+        tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 15), InetAddressAndPort.getByName("127.0.0.1"));
+        tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 15), InetAddressAndPort.getByName("127.0.0.2"));
+        tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 15), InetAddressAndPort.getByName("127.0.0.3"));
 
 
         File backupdir = moveToBackupDir(sstables);
@@ -536,6 +547,7 @@
         beforeFirstImport.forEach(s -> s.selfRef().release());
         options = SSTableImporter.Options.options(backupdir.toString()).verifySSTables(true).verifyTokens(true).invalidateCaches(true).build();
         importer.importNewSSTables(options);
+        Thread.sleep(2000);
         assertEquals(10, CacheService.instance.rowCache.size());
         it = CacheService.instance.rowCache.keyIterator();
         while (it.hasNext())
@@ -707,6 +719,49 @@
         }
     }
 
+    /**
+     * This test verifies that we successfully import SSTables which are in a directory structure
+     * where table name and keyspace name (current dir name and parent dir name) do not match the keyspace and
+     * table arguments on the command line.
+     *
+     * @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-19401">CASSANDRA-19401</a>
+     */
+    @Test
+    public void importFromNonMatchingKeyspaceTableDir() throws Throwable
+    {
+        String table = "nonmatchingtable";
+        try
+        {
+            schemaChange(String.format("CREATE TABLE %s.%s (id int primary key, d int)", KEYSPACE, table));
+
+            for (int i = 0; i < 10; i++)
+                execute(String.format("INSERT INTO %s.%s (id, d) values (?, ?)", KEYSPACE, table), i, i);
+
+            ColumnFamilyStore cfs = getColumnFamilyStore(KEYSPACE, table);
+            Util.flush(cfs);
+
+            Set<SSTableReader> sstables = cfs.getLiveSSTables();
+            cfs.clearUnsafe();
+
+            File backupDir = moveToBackupDir(sstables, "randomdir1", "randomdir2");
+
+            assertEquals(0, execute(String.format("SELECT * FROM %s.%s", KEYSPACE, table)).size());
+
+            SSTableImporter importer = new SSTableImporter(cfs);
+            SSTableImporter.Options options = SSTableImporter.Options.options(backupDir.toString()).copyData(true).build();
+            List<String> failedDirectories = importer.importNewSSTables(options);
+            assertTrue(failedDirectories.isEmpty());
+            assertEquals(10, execute(String.format("select * from %s.%s", KEYSPACE, table)).size());
+
+            // files are left there as they were just copied
+            Assert.assertNotEquals(0, countFiles(backupDir));
+        }
+        finally
+        {
+            execute(String.format("DROP TABLE IF EXISTS %s.%s", KEYSPACE, table));
+        }
+    }
+
     private static class MockCFS extends ColumnFamilyStore
     {
         public MockCFS(ColumnFamilyStore cfs, Directories dirs)