Make nodetool import congruent with the documentation by not relying on the folder structure of the imported SSTable files

nodetool import requires keyspace and table on the command line to import SSTables to. This
was not always working as specifying keypace and table while having SSTables located in the
directory structure which differed on keyspace and table (dir and parent dir) just
ignored them.

patch by Stefan Miklosovic; reviewed by Brandon Williams for CASSANDRA-19401
diff --git a/CHANGES.txt b/CHANGES.txt
index 5db267e..452751c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0.13
+ * Make nodetool import congruent with the documentation by not relying on the folder structure of the imported SSTable files (CASSANDRA-19401)
  * IR may leak SSTables with pending repair when coming from streaming (CASSANDRA-19182)
  * Streaming exception race creates corrupt transaction log files that prevent restart (CASSANDRA-18736)
  * Fix CQL tojson timestamp output on negative timestamp values before Gregorian calendar reform in 1582 (CASSANDRA-19566)
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index cecc60d..88e8c1f 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -877,7 +877,21 @@
 
         public Map<Descriptor, Set<Component>> list()
         {
-            filter();
+            return list(false);
+        }
+
+        /**
+         * This method is used upon SSTable importing (nodetool import) as there is no strict requirement to
+         * place SSTables in a directory structure where name of a directory equals to table name
+         * and parent of such directory equals to keyspace name. For such cases, we want to include such SSTables too,
+         * rendering the parameter to be set to true.
+         *
+         * @param includeForeignTables whether descriptors not matching metadata of this lister should be included
+         * @return found descriptors and related set of components
+         */
+        public Map<Descriptor, Set<Component>> list(boolean includeForeignTables)
+        {
+            filter(includeForeignTables);
             return ImmutableMap.copyOf(components);
         }
 
@@ -895,7 +909,12 @@
 
         public List<File> listFiles()
         {
-            filter();
+            return listFiles(false);
+        }
+
+        public List<File> listFiles(boolean includeForeignTables)
+        {
+            filter(includeForeignTables);
             List<File> l = new ArrayList<>(nbFiles);
             for (Map.Entry<Descriptor, Set<Component>> entry : components.entrySet())
             {
@@ -907,7 +926,7 @@
             return l;
         }
 
-        private void filter()
+        private void filter(boolean includeForeignTables)
         {
             if (filtered)
                 return;
@@ -919,21 +938,21 @@
 
                 if (snapshotName != null)
                 {
-                    LifecycleTransaction.getFiles(getSnapshotDirectory(location, snapshotName).toPath(), getFilter(), onTxnErr);
+                    LifecycleTransaction.getFiles(getSnapshotDirectory(location, snapshotName).toPath(), getFilter(includeForeignTables), onTxnErr);
                     continue;
                 }
 
                 if (!onlyBackups)
-                    LifecycleTransaction.getFiles(location.toPath(), getFilter(), onTxnErr);
+                    LifecycleTransaction.getFiles(location.toPath(), getFilter(includeForeignTables), onTxnErr);
 
                 if (includeBackups)
-                    LifecycleTransaction.getFiles(getBackupsDirectory(location).toPath(), getFilter(), onTxnErr);
+                    LifecycleTransaction.getFiles(getBackupsDirectory(location).toPath(), getFilter(includeForeignTables), onTxnErr);
             }
 
             filtered = true;
         }
 
-        private BiPredicate<File, FileType> getFilter()
+        private BiPredicate<File, FileType> getFilter(boolean includeForeignTables)
         {
             // This function always return false since it adds to the components map
             return (file, type) ->
@@ -951,15 +970,31 @@
                         if (pair == null)
                             return false;
 
+                        Descriptor descriptor = null;
+
                         // we are only interested in the SSTable files that belong to the specific ColumnFamily
                         if (!pair.left.ksname.equals(metadata.keyspace) || !pair.left.cfname.equals(metadata.name))
-                            return false;
+                        {
+                            if (!includeForeignTables)
+                                return false;
 
-                        Set<Component> previous = components.get(pair.left);
+                            descriptor = new Descriptor(pair.left.version,
+                                                        pair.left.directory,
+                                                        metadata.keyspace,
+                                                        metadata.name,
+                                                        pair.left.generation,
+                                                        pair.left.formatType);
+                        }
+                        else
+                        {
+                            descriptor = pair.left;
+                        }
+
+                        Set<Component> previous = components.get(descriptor);
                         if (previous == null)
                         {
                             previous = new HashSet<>();
-                            components.put(pair.left, previous);
+                            components.put(descriptor, previous);
                         }
                         previous.add(pair.right);
                         nbFiles++;
diff --git a/src/java/org/apache/cassandra/db/SSTableImporter.java b/src/java/org/apache/cassandra/db/SSTableImporter.java
index 989ff12..c7c4f6b 100644
--- a/src/java/org/apache/cassandra/db/SSTableImporter.java
+++ b/src/java/org/apache/cassandra/db/SSTableImporter.java
@@ -86,7 +86,7 @@
             {
                 Directories.SSTableLister lister = listerPair.left;
                 String dir = listerPair.right;
-                for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet())
+                for (Map.Entry<Descriptor, Set<Component>> entry : lister.list(true).entrySet())
                 {
                     Descriptor descriptor = entry.getKey();
                     if (!currentDescriptors.contains(entry.getKey()))
@@ -124,7 +124,7 @@
 
             Set<MovedSSTable> movedSSTables = new HashSet<>();
             Set<SSTableReader> newSSTablesPerDirectory = new HashSet<>();
-            for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet())
+            for (Map.Entry<Descriptor, Set<Component>> entry : lister.list(true).entrySet())
             {
                 try
                 {
diff --git a/test/unit/org/apache/cassandra/db/ImportTest.java b/test/unit/org/apache/cassandra/db/ImportTest.java
index a760768..e812d0f 100644
--- a/test/unit/org/apache/cassandra/db/ImportTest.java
+++ b/test/unit/org/apache/cassandra/db/ImportTest.java
@@ -32,8 +32,8 @@
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.commons.lang3.StringUtils;
+import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.cassandra.cache.RowCacheKey;
@@ -55,6 +55,12 @@
 
 public class ImportTest extends CQLTester
 {
+    @After
+    public void afterTest()
+    {
+        SSTableReader.resetTidying();
+    }
+
     @Test
     public void basicImportByMovingTest() throws Throwable
     {
@@ -213,7 +219,7 @@
             assertTrue(sstable.isRepaired());
 
         getCurrentColumnFamilyStore().clearUnsafe();
-        backupdir = moveToBackupDir(sstables);
+        backupdir = moveToBackupDir(sstables, KEYSPACE, currentTable());
 
         options = SSTableImporter.Options.options(backupdir.toString()).clearRepaired(true).build();
         importer.importNewSSTables(options);
@@ -225,6 +231,11 @@
 
     private File moveToBackupDir(Set<SSTableReader> sstables) throws IOException
     {
+        return moveToBackupDir(sstables, KEYSPACE, currentTable());
+    }
+
+    private File moveToBackupDir(Set<SSTableReader> sstables, String keyspace, String table) throws IOException
+    {
         Path temp = Files.createTempDirectory("importtest");
         SSTableReader sst = sstables.iterator().next();
         String tabledir = sst.descriptor.directory.getName();
@@ -409,12 +420,12 @@
 
         TokenMetadata tmd = StorageService.instance.getTokenMetadata();
 
-        tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 5), InetAddressAndPort.getByName("127.0.0.1"));
-        tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 5), InetAddressAndPort.getByName("127.0.0.2"));
-        tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 5), InetAddressAndPort.getByName("127.0.0.3"));
+        tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 15), InetAddressAndPort.getByName("127.0.0.1"));
+        tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 15), InetAddressAndPort.getByName("127.0.0.2"));
+        tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 15), InetAddressAndPort.getByName("127.0.0.3"));
 
 
-        File backupdir = moveToBackupDir(sstables);
+        File backupdir = moveToBackupDir(sstables, KEYSPACE, currentTable());
         try
         {
             SSTableImporter.Options options = SSTableImporter.Options.options(backupdir.toString()).verifySSTables(true).verifyTokens(true).build();
@@ -534,6 +545,7 @@
         beforeFirstImport.forEach(s -> s.selfRef().release());
         options = SSTableImporter.Options.options(backupdir.toString()).verifySSTables(true).verifyTokens(true).invalidateCaches(true).build();
         importer.importNewSSTables(options);
+        Thread.sleep(2000);
         assertEquals(10, CacheService.instance.rowCache.size());
         it = CacheService.instance.rowCache.keyIterator();
         while (it.hasNext())
@@ -682,7 +694,7 @@
                 Set<SSTableReader> sstables = cfs.getLiveSSTables();
                 cfs.clearUnsafe();
 
-                File backupDir = moveToBackupDir(sstables);
+                File backupDir = moveToBackupDir(sstables, KEYSPACE, table);
 
                 assertEquals(0, execute(String.format("SELECT * FROM %s.%s", KEYSPACE, table)).size());
 
@@ -704,6 +716,49 @@
         }
     }
 
+    /**
+     * This test verifies that we successfully import SSTables which are in a directory structure
+     * where table name and keyspace name (current dir name and parent dir name) do not match the keyspace and
+     * table arguments on the command line.
+     *
+     * @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-19401">CASSANDRA-19401</a>
+     */
+    @Test
+    public void importFromNonMatchingKeyspaceTableDir() throws Throwable
+    {
+        String table = "nonmatchingtable";
+        try
+        {
+            schemaChange(String.format("CREATE TABLE %s.%s (id int primary key, d int)", KEYSPACE, table));
+
+            for (int i = 0; i < 10; i++)
+                execute(String.format("INSERT INTO %s.%s (id, d) values (?, ?)", KEYSPACE, table), i, i);
+
+            ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(KEYSPACE, table);
+            cfs.forceBlockingFlush();
+
+            Set<SSTableReader> sstables = cfs.getLiveSSTables();
+            cfs.clearUnsafe();
+
+            File backupDir = moveToBackupDir(sstables, "randomdir1", "randomdir2");
+
+            assertEquals(0, execute(String.format("SELECT * FROM %s.%s", KEYSPACE, table)).size());
+
+            SSTableImporter importer = new SSTableImporter(cfs);
+            SSTableImporter.Options options = SSTableImporter.Options.options(backupDir.toString()).copyData(true).build();
+            List<String> failedDirectories = importer.importNewSSTables(options);
+            assertTrue(failedDirectories.isEmpty());
+            assertEquals(10, execute(String.format("select * from %s.%s", KEYSPACE, table)).size());
+
+            // files are left there as they were just copied
+            Assert.assertNotEquals(0, countFiles(backupDir));
+        }
+        finally
+        {
+            execute(String.format("DROP TABLE IF EXISTS %s.%s", KEYSPACE, table));
+        }
+    }
+
     private static class MockCFS extends ColumnFamilyStore
     {
         public MockCFS(ColumnFamilyStore cfs, Directories dirs)